This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b741b6b7da2 refactor: migrate to ScanV2Internal API and remove 
ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN config (#17520)
5b741b6b7da2 is described below

commit 5b741b6b7da2ff63e6d62abceba49b0108215186
Author: Surya Prasanna <[email protected]>
AuthorDate: Tue Feb 3 12:18:29 2026 -0800

    refactor: migrate to ScanV2Internal API and remove 
ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN config (#17520)
---
 .../cli/commands/TestHoodieLogFileCommand.java     |   1 -
 .../apache/hudi/config/HoodieCompactionConfig.java |   7 -
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   4 -
 .../org/apache/hudi/index/HoodieIndexUtils.java    |   1 -
 .../hudi/io/FileGroupReaderBasedAppendHandle.java  |   2 +-
 .../metadata/HoodieBackedTableMetadataWriter.java  |   3 +-
 .../hudi/metadata/HoodieMetadataWriteUtils.java    |   5 +-
 .../strategy/ClusteringExecutionStrategy.java      |   7 +-
 .../GenericRecordValidationTestUtils.java          |   2 -
 .../hudi/client/TestJavaHoodieBackedMetadata.java  |   1 -
 .../MultipleSparkJobExecutionStrategy.java         |   3 +-
 .../client/utils/SparkMetadataWriterUtils.java     |   1 -
 .../client/TestSparkRDDMetadataWriteClient.java    |   1 -
 .../functional/TestHoodieBackedTableMetadata.java  |   1 -
 .../hudi/common/config/HoodieReaderConfig.java     |   8 -
 .../table/log/AbstractHoodieLogRecordScanner.java  | 168 +------------------
 .../table/log/BaseHoodieLogRecordReader.java       | 186 +--------------------
 .../table/log/HoodieLogBlockMetadataScanner.java   |   8 +-
 .../table/log/HoodieMergedLogRecordReader.java     |  12 +-
 .../table/log/HoodieMergedLogRecordScanner.java    |  13 +-
 .../table/log/HoodieUnMergedLogRecordScanner.java  |  14 +-
 .../common/table/read/HoodieFileGroupReader.java   |  12 --
 .../hudi/common/table/read/ReaderParameters.java   |  16 +-
 .../read/buffer/LogScanningRecordBufferLoader.java |   1 -
 .../hudi/metadata/HoodieBackedTableMetadata.java   |   1 -
 .../hudi/metadata/HoodieTableMetadataUtil.java     |  24 +--
 .../org/apache/hudi/table/format/FormatUtils.java  |   1 -
 .../common/functional/TestHoodieLogFormat.java     | 124 +++++---------
 .../HoodieTimestampAwareParquetInputFormat.java    |   5 +-
 .../realtime/RealtimeCompactedRecordReader.java    |   3 -
 .../apache/hudi/hadoop/TestHoodieHiveRecord.java   |  12 +-
 .../reader/DFSHoodieDatasetInputReader.java        |   2 -
 .../org/apache/hudi/cdc/CDCFileGroupIterator.scala |   3 -
 .../procedures/PartitionBucketIndexManager.scala   |   7 +-
 .../TestHoodieClientOnMergeOnReadStorage.java      |   1 -
 .../TestMetadataUtilRLIandSIRecordGeneration.java  |   5 +-
 .../hudi/functional/TestHoodieBackedMetadata.java  |   1 -
 37 files changed, 90 insertions(+), 576 deletions(-)

diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
index 64f0bcd41a6f..51bd2c2843f3 100644
--- 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
+++ 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
@@ -249,7 +249,6 @@ public class TestHoodieLogFileCommand extends 
CLIFunctionalTestHarness {
         .withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath())
         
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
         
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
-        
.withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
         .build();
 
     Iterator<HoodieRecord> records = scanner.iterator();
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index 79b9c962ebda..5a60ebbcbc76 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -35,8 +35,6 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.util.Properties;
 
-import static 
org.apache.hudi.common.config.HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN;
-
 /**
  * Compaction related config.
  */
@@ -476,11 +474,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
       return this;
     }
 
-    public Builder withEnableOptimizedLogBlocksScan(String 
enableOptimizedLogBlocksScan) {
-      compactionConfig.setValue(ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN, 
enableOptimizedLogBlocksScan);
-      return this;
-    }
-
     public Builder withCompactionSpecifyPartitionPathRegex(String 
partitionPathRegex) {
       compactionConfig.setValue(COMPACTION_SPECIFY_PARTITION_PATH_REGEX, 
partitionPathRegex);
       return this;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index beda2612b275..4789af764bb1 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1714,10 +1714,6 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getInt(HoodieCompactionConfig.LOG_COMPACTION_BLOCKS_THRESHOLD);
   }
 
-  public boolean enableOptimizedLogBlocksScan() {
-    return getBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN);
-  }
-
   public HoodieCleaningPolicy getCleanerPolicy() {
     return HoodieCleaningPolicy.valueOf(getString(CLEANER_POLICY));
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index 39e7369fd312..c9965a8c20ed 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -361,7 +361,6 @@ public class HoodieIndexUtils {
           .withRequestedSchema(dataSchema)
           .withInternalSchema(internalSchemaOption)
           .withProps(metaClient.getTableConfig().getProps())
-          
.withEnableOptimizedLogBlockScan(config.enableOptimizedLogBlocksScan())
           .build();
       try {
         final HoodieRecordLocation currentLocation = new 
HoodieRecordLocation(fileSlice.getBaseInstantTime(), fileSlice.getFileId());
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedAppendHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedAppendHandle.java
index ac8cff79e18c..9f65a5d2f477 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedAppendHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedAppendHandle.java
@@ -88,7 +88,7 @@ public class FileGroupReaderBasedAppendHandle<T, I, K, O> 
extends HoodieAppendHa
         
.withRequestedSchema(writeSchemaWithMetaFields).withInternalSchema(internalSchemaOption).withProps(props).withEmitDelete(true)
         
.withShouldUseRecordPosition(usePosition).withSortOutput(hoodieTable.requireSortedRecords())
         // instead of using config.enableOptimizedLogBlocksScan(), we set to 
true as log compaction blocks only supported in scanV2
-        .withEnableOptimizedLogBlockScan(true).build()) {
+        .build()) {
       recordItr = new 
CloseableMappingIterator<>(fileGroupReader.getLogRecordsOnly(), record -> {
         HoodieRecord<T> hoodieRecord = 
readerContext.getRecordContext().constructHoodieRecord(record);
         hoodieRecord.setCurrentLocation(newRecordLocation);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 98145b660a86..a78fd998866e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -886,7 +886,6 @@ public abstract class HoodieBackedTableMetadataWriter<I, O> 
implements HoodieTab
           .withInternalSchema(internalSchemaOption)
           .withShouldUseRecordPosition(false)
           .withProps(metaClient.getTableConfig().getProps())
-          
.withEnableOptimizedLogBlockScan(dataWriteConfig.enableOptimizedLogBlocksScan())
           .build();
       String baseFileInstantTime = fileSlice.getBaseInstantTime();
       return new 
CloseableMappingIterator<>(fileGroupReader.getClosableIterator(), record -> {
@@ -1497,7 +1496,7 @@ public abstract class HoodieBackedTableMetadataWriter<I, 
O> implements HoodieTab
               dataWriteConfig.getMetadataConfig(),
               partitionsToUpdate, dataWriteConfig.getBloomFilterType(),
               dataWriteConfig.getBloomIndexParallelism(), 
dataWriteConfig.getWritesFileIdEncoding(), getEngineType(),
-              Option.of(dataWriteConfig.getRecordMerger().getRecordType()), 
dataWriteConfig.enableOptimizedLogBlocksScan());
+              Option.of(dataWriteConfig.getRecordMerger().getRecordType()));
 
       // Updates for record index are created by parsing the WriteStatus which 
is a hudi-client object. Hence, we cannot yet move this code
       // to the HoodieTableMetadataUtil class in hudi-common.
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
index b3910def8585..777049a62c46 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
@@ -217,7 +217,6 @@ public class HoodieMetadataWriteUtils {
         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
             .withInlineCompaction(false)
             
.withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax())
-            
.withEnableOptimizedLogBlocksScan(String.valueOf(writeConfig.enableOptimizedLogBlocksScan()))
             // Compaction on metadata table is used as a barrier for archiving 
on main dataset and for validating the
             // deltacommits having corresponding completed commits. Therefore, 
we need to compact all fileslices of all
             // partitions together requiring UnBoundedCompactionStrategy.
@@ -381,7 +380,7 @@ public class HoodieMetadataWriteUtils {
                                                                                
String instantTime, HoodieTableMetaClient dataMetaClient, HoodieTableMetadata 
tableMetadata,
                                                                                
HoodieMetadataConfig metadataConfig, Set<String> enabledPartitionTypes, String 
bloomFilterType,
                                                                                
int bloomIndexParallelism, int writesFileIdEncoding, EngineType engineType,
-                                                                               
Option<HoodieRecord.HoodieRecordType> recordTypeOpt, boolean 
enableOptimizeLogBlocksScan) {
+                                                                               
Option<HoodieRecord.HoodieRecordType> recordTypeOpt) {
     final Map<String, HoodieData<HoodieRecord>> partitionToRecordsMap = new 
HashMap<>();
     final HoodieData<HoodieRecord> filesPartitionRecordsRDD = 
context.parallelize(
         convertMetadataToFilesPartitionRecords(commitMetadata, instantTime), 
1);
@@ -409,7 +408,7 @@ public class HoodieMetadataWriteUtils {
     }
     if 
(enabledPartitionTypes.contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath()))
 {
       
partitionToRecordsMap.put(MetadataPartitionType.RECORD_INDEX.getPartitionPath(),
 convertMetadataToRecordIndexRecords(context, commitMetadata, metadataConfig,
-          dataMetaClient, writesFileIdEncoding, instantTime, engineType, 
enableOptimizeLogBlocksScan));
+          dataMetaClient, writesFileIdEncoding, instantTime, engineType));
     }
     return partitionToRecordsMap;
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
index bc7fec18a564..d34a85a97bd4 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
@@ -96,11 +96,10 @@ public abstract class ClusteringExecutionStrategy<T, I, K, 
O> implements Seriali
 
     FileSlice fileSlice = 
clusteringOperationToFileSlice(table.getMetaClient().getBasePath().toString(), 
operation);
     final boolean usePosition = 
getWriteConfig().getBooleanOrDefault(MERGE_USE_RECORD_POSITIONS);
-    final boolean enableLogBlocksScan = 
getWriteConfig().enableOptimizedLogBlocksScan();
     Option<InternalSchema> internalSchema = 
SerDeHelper.fromJson(getWriteConfig().getInternalSchema());
     try {
       return getFileGroupReader(table.getMetaClient(), fileSlice, 
readerSchemaWithMetaFields, internalSchema,
-              readerContextFactory, instantTime, props, usePosition, 
enableLogBlocksScan).getClosableHoodieRecordIterator();
+              readerContextFactory, instantTime, props, 
usePosition).getClosableHoodieRecordIterator();
     } catch (IOException e) {
       throw new HoodieClusteringException("Error reading file slices", e);
     }
@@ -141,11 +140,11 @@ public abstract class ClusteringExecutionStrategy<T, I, 
K, O> implements Seriali
 
   protected static <R> HoodieFileGroupReader<R> 
getFileGroupReader(HoodieTableMetaClient metaClient, FileSlice fileSlice, 
HoodieSchema readerSchema, Option<InternalSchema> internalSchemaOption,
                                                                    
ReaderContextFactory<R> readerContextFactory, String instantTime,
-                                                                   
TypedProperties properties, boolean usePosition, boolean enableLogBlocksScan) {
+                                                                   
TypedProperties properties, boolean usePosition) {
     HoodieReaderContext<R> readerContext = readerContextFactory.getContext();
     return HoodieFileGroupReader.<R>newBuilder()
         
.withReaderContext(readerContext).withHoodieTableMetaClient(metaClient).withLatestCommitTime(instantTime)
         
.withFileSlice(fileSlice).withDataSchema(readerSchema).withRequestedSchema(readerSchema).withInternalSchema(internalSchemaOption)
-        
.withShouldUseRecordPosition(usePosition).withEnableOptimizedLogBlockScan(enableLogBlocksScan).withProps(properties).build();
+        
.withShouldUseRecordPosition(usePosition).withProps(properties).build();
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
index f880c35594fd..77b146d38506 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.testutils;
 
-import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.schema.HoodieSchema;
@@ -113,7 +112,6 @@ public class GenericRecordValidationTestUtils {
         .collect(Collectors.toList());
 
     jobConf.set(String.format(HOODIE_CONSUME_COMMIT, config.getTableName()), 
instant1);
-    jobConf.set(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.key(), 
"true");
     List<GenericRecord> records = 
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
         storageConf, fullPartitionPaths, config.getBasePath(), jobConf, true);
     Map<String, GenericRecord> prevRecordsMap = records.stream()
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index 3525995fb540..7d174414214a 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -955,7 +955,6 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
         .withRequestedSchema(schema)
         .withDataSchema(schema)
         .withProps(new TypedProperties())
-        
.withEnableOptimizedLogBlockScan(writeConfig.getMetadataConfig().isOptimizedLogBlocksScanEnabled())
         .build();
 
     try (ClosableIterator<HoodieRecord<IndexedRecord>> iter = 
fileGroupReader.getClosableHoodieRecordIterator()) {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index be6d516a303f..9976793199d8 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -300,7 +300,6 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
     long maxMemoryPerCompaction = 
IOUtils.getMaxMemoryPerCompaction(getEngineContext().getTaskContextSupplier(), 
writeConfig);
     TypedProperties readerProperties = 
getReaderProperties(maxMemoryPerCompaction);
     final boolean usePosition = 
getWriteConfig().getBooleanOrDefault(MERGE_USE_RECORD_POSITIONS);
-    final boolean enableLogBlocksScan = 
getWriteConfig().enableOptimizedLogBlocksScan();
     String internalSchemaStr = getWriteConfig().getInternalSchema();
 
     // broadcast reader context.
@@ -317,7 +316,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
 
         // instantiate FG reader
         HoodieFileGroupReader<InternalRow> fileGroupReader = 
getFileGroupReader(metaClient, fileSlice, tableSchemaWithMetaFields, 
internalSchemaOption,
-            readerContextFactory, instantTime, readerProperties, usePosition, 
enableLogBlocksScan);
+            readerContextFactory, instantTime, readerProperties, usePosition);
         // read records from the FG reader
         return 
CloseableIteratorListener.addListener(fileGroupReader.getClosableIterator());
       }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index 351512ed0940..47155899e204 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -346,7 +346,6 @@ public class SparkMetadataWriterUtils {
         .withBaseFileOption(baseFileOption)
         .withLogFiles(logFileStream)
         .withPartitionPath(partition)
-        
.withEnableOptimizedLogBlockScan(dataWriteConfig.enableOptimizedLogBlocksScan())
         .build();
     try {
       ClosableIterator<InternalRow> rowsForFilePath = 
fileGroupReader.getClosableIterator();
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java
index e2cb75524f58..8a88dd813588 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java
@@ -226,7 +226,6 @@ public class TestSparkRDDMetadataWriteClient extends 
HoodieClientTestBase {
         .withRequestedSchema(metadataSchema)
         .withDataSchema(schema)
         .withProps(new TypedProperties())
-        
.withEnableOptimizedLogBlockScan(hoodieWriteConfig.getMetadataConfig().isOptimizedLogBlocksScanEnabled())
         .build();
     try (ClosableIterator<HoodieRecord<IndexedRecord>> records = 
fileGroupReader.getClosableHoodieRecordIterator()) {
       Map<String, HoodieRecord<HoodieMetadataPayload>> actualMdtRecordMap = 
new HashMap<>();
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
index 353d94904ceb..6bb71f07de61 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
@@ -570,7 +570,6 @@ public class TestHoodieBackedTableMetadata extends 
TestHoodieMetadataBase {
         .withRequestedSchema(schema)
         .withDataSchema(schema)
         .withProps(new TypedProperties())
-        
.withEnableOptimizedLogBlockScan(metadataTableWriteConfig.getMetadataConfig().isOptimizedLogBlocksScanEnabled())
         .build();
 
     try (ClosableIterator<HoodieRecord<IndexedRecord>> iter = 
fileGroupReader.getClosableHoodieRecordIterator()) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
index 702309874261..1d53286f3747 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
@@ -45,14 +45,6 @@ public class HoodieReaderConfig extends HoodieConfig {
       .withDocumentation("HoodieLogFormatReader reads a logfile in the forward 
direction starting from pos=0 to pos=file_length. "
           + "If this config is set to true, the reader reads the logfile in 
reverse direction, from pos=file_length to pos=0");
 
-  public static final ConfigProperty<String> ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN 
= ConfigProperty
-      .key("hoodie" + HoodieMetadataConfig.OPTIMIZED_LOG_BLOCKS_SCAN)
-      .defaultValue("false")
-      .markAdvanced()
-      .sinceVersion("0.13.0")
-      .withDocumentation("New optimized scan for log blocks that handles all 
multi-writer use-cases while appending to log files. "
-          + "It also differentiates original blocks written by ingestion 
writers and compacted blocks written log compaction.");
-
   public static final ConfigProperty<Boolean> FILE_GROUP_READER_ENABLED = 
ConfigProperty
       .key("hoodie.file.group.reader.enabled")
       .defaultValue(true)
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
index 4956f0473631..b1eb9e0e2c98 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
@@ -143,8 +143,6 @@ public abstract class AbstractHoodieLogRecordScanner {
   protected final HoodieRecordType recordType;
   // Collect all the block instants after scanning all the log files.
   private final List<String> validBlockInstants = new ArrayList<>();
-  // Use scanV2 method.
-  private final boolean enableOptimizedLogBlocksScan;
   // table version for compatibility
   private final HoodieTableVersion tableVersion;
   // Allows to consider inflight instants while merging log records
@@ -161,7 +159,6 @@ public abstract class AbstractHoodieLogRecordScanner {
                                            Option<String> 
partitionNameOverride,
                                            InternalSchema internalSchema,
                                            Option<String> keyFieldOverride,
-                                           boolean 
enableOptimizedLogBlocksScan,
                                            HoodieRecordMerger recordMerger,
                                            Option<HoodieTableMetaClient> 
hoodieTableMetaClientOption) {
     this.readerSchema = readerSchema;
@@ -192,7 +189,6 @@ public abstract class AbstractHoodieLogRecordScanner {
     this.withOperationField = withOperationField;
     this.forceFullScan = forceFullScan;
     this.internalSchema = internalSchema == null ? 
InternalSchema.getEmptyInternalSchema() : internalSchema;
-    this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan;
 
     if (keyFieldOverride.isPresent()) {
       // NOTE: This branch specifically is leveraged handling Metadata Table
@@ -243,163 +239,7 @@ public abstract class AbstractHoodieLogRecordScanner {
    * @param keySpecOpt specifies target set of keys to be scanned
    * @param skipProcessingBlocks controls, whether (delta) blocks have to 
actually be processed
    */
-  protected final void scanInternal(Option<KeySpec> keySpecOpt, boolean 
skipProcessingBlocks) {
-    synchronized (this) {
-      if (enableOptimizedLogBlocksScan) {
-        scanInternalV2(keySpecOpt, skipProcessingBlocks);
-      } else {
-        scanInternalV1(keySpecOpt);
-      }
-    }
-  }
-
-  private void scanInternalV1(Option<KeySpec> keySpecOpt) {
-    currentInstantLogBlocks = new ArrayDeque<>();
-
-    progress = 0.0f;
-    totalLogFiles = new AtomicLong(0);
-    totalRollbacks = new AtomicLong(0);
-    totalCorruptBlocks = new AtomicLong(0);
-    totalLogBlocks = new AtomicLong(0);
-    totalLogRecords = new AtomicLong(0);
-    HoodieLogFormatReader logFormatReaderWrapper = null;
-    try {
-      // Iterate over the paths
-      logFormatReaderWrapper = new HoodieLogFormatReader(storage,
-          logFilePaths.stream()
-              .map(filePath -> new HoodieLogFile(new StoragePath(filePath)))
-              .collect(Collectors.toList()),
-          readerSchema, reverseReader, bufferSize, shouldLookupRecords(), 
recordKeyField, internalSchema);
-
-      Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
-      while (logFormatReaderWrapper.hasNext()) {
-        HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
-        LOG.info("Scanning log file {}", logFile);
-        scannedLogFiles.add(logFile);
-        totalLogFiles.set(scannedLogFiles.size());
-        // Use the HoodieLogFileReader to iterate through the blocks in the 
log file
-        HoodieLogBlock logBlock = logFormatReaderWrapper.next();
-        final String instantTime = 
logBlock.getLogBlockHeader().get(INSTANT_TIME);
-        totalLogBlocks.incrementAndGet();
-        if (logBlock.isDataOrDeleteBlock()) {
-          if (this.tableVersion.lesserThan(HoodieTableVersion.EIGHT) && 
!allowInflightInstants) {
-            if 
(!getOrCreateCompletedInstantsTimeline().containsOrBeforeTimelineStarts(instantTime)
-                || 
getOrCreateInflightInstantsTimeline().containsInstant(instantTime)) {
-              // hit an uncommitted block possibly from a failed write, move 
to the next one and skip processing this one
-              continue;
-            }
-          }
-          if 
(compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), 
GREATER_THAN, this.latestInstantTime)) {
-            // Skip processing a data or delete block with the instant time 
greater than the latest instant time used by this log record reader
-            continue;
-          }
-          if (instantRange.isPresent() && 
!instantRange.get().isInRange(instantTime)) {
-            // filter the log block by instant range
-            continue;
-          }
-        }
-        switch (logBlock.getBlockType()) {
-          case HFILE_DATA_BLOCK:
-          case AVRO_DATA_BLOCK:
-          case PARQUET_DATA_BLOCK:
-            LOG.info("Reading a data block from file {} at instant {}", 
logFile.getPath(), instantTime);
-            // store the current block
-            currentInstantLogBlocks.push(logBlock);
-            break;
-          case DELETE_BLOCK:
-            LOG.info("Reading a delete block from file {}", logFile.getPath());
-            // store deletes so can be rolled back
-            currentInstantLogBlocks.push(logBlock);
-            break;
-          case COMMAND_BLOCK:
-            // Consider the following scenario
-            // (Time 0, C1, Task T1) -> Running
-            // (Time 1, C1, Task T1) -> Failed (Wrote either a corrupt block 
or a correct
-            // DataBlock (B1) with commitTime C1
-            // (Time 2, C1, Task T1.2) -> Running (Task T1 was retried and the 
attempt number is 2)
-            // (Time 3, C1, Task T1.2) -> Finished (Wrote a correct DataBlock 
B2)
-            // Now a logFile L1 can have 2 correct Datablocks (B1 and B2) 
which are the same.
-            // Say, commit C1 eventually failed and a rollback is triggered.
-            // Rollback will write only 1 rollback block (R1) since it assumes 
one block is
-            // written per ingestion batch for a file but in reality we need 
to rollback (B1 & B2)
-            // The following code ensures the same rollback block (R1) is used 
to rollback
-            // both B1 & B2
-            // This is a command block - take appropriate action based on the 
command
-            HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock;
-            String targetInstantForCommandBlock =
-                
logBlock.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME);
-            LOG.info("Reading a command block {} with targetInstantTime {} 
from file {}", commandBlock.getType(), targetInstantForCommandBlock, 
logFile.getPath());
-            switch (commandBlock.getType()) { // there can be different types 
of command blocks
-              case ROLLBACK_BLOCK:
-                // Rollback older read log block(s)
-                // Get commit time from older record blocks, compare with 
targetCommitTime,
-                // rollback only if equal, this is required in scenarios of 
invalid/extra
-                // rollback blocks written due to failures during the rollback 
operation itself
-                // and ensures the same rollback block (R1) is used to 
rollback both B1 & B2 with
-                // same instant_time.
-                final int instantLogBlockSizeBeforeRollback = 
currentInstantLogBlocks.size();
-                currentInstantLogBlocks.removeIf(block -> {
-                  // handle corrupt blocks separately since they may not have 
metadata
-                  if (block.getBlockType() == CORRUPT_BLOCK) {
-                    LOG.info("Rolling back the last corrupted log block read 
in {}", logFile.getPath());
-                    return true;
-                  }
-                  if 
(targetInstantForCommandBlock.contentEquals(block.getLogBlockHeader().get(INSTANT_TIME)))
 {
-                    // rollback older data block or delete block
-                    LOG.info("Rolling back an older log block read from {} 
with instantTime {}", logFile.getPath(), targetInstantForCommandBlock);
-                    return true;
-                  }
-                  return false;
-                });
-                final int numBlocksRolledBack = 
instantLogBlockSizeBeforeRollback - currentInstantLogBlocks.size();
-                totalRollbacks.addAndGet(numBlocksRolledBack);
-                LOG.info("Number of applied rollback blocks {}", 
numBlocksRolledBack);
-                if (numBlocksRolledBack == 0) {
-                  LOG.warn("TargetInstantTime {} invalid or extra rollback 
command block in {}", targetInstantForCommandBlock, logFile.getPath());
-                }
-                break;
-              default:
-                throw new UnsupportedOperationException("Command type not yet 
supported.");
-            }
-            break;
-          case CORRUPT_BLOCK:
-            LOG.info("Found a corrupt block in {}", logFile.getPath());
-            totalCorruptBlocks.incrementAndGet();
-            // If there is a corrupt block - we will assume that this was the 
next data block
-            currentInstantLogBlocks.push(logBlock);
-            break;
-          default:
-            throw new UnsupportedOperationException("Block type not supported 
yet");
-        }
-      }
-      // merge the last read block when all the blocks are done reading
-      if (!currentInstantLogBlocks.isEmpty()) {
-        // if there are no dups, we can take currentInstantLogBlocks as is.
-        LOG.info("Merging the final data blocks");
-        processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size(), keySpecOpt);
-      }
-
-      // Done
-      progress = 1.0f;
-    } catch (IOException e) {
-      LOG.error("Got IOException when reading log file", e);
-      throw new HoodieIOException("IOException when reading log file ", e);
-    } catch (Exception e) {
-      LOG.error("Got exception when reading log file", e);
-      throw new HoodieException("Exception when reading log file ", e);
-    } finally {
-      try {
-        if (null != logFormatReaderWrapper) {
-          logFormatReaderWrapper.close();
-        }
-      } catch (IOException ioe) {
-        // Eat exception as we do not want to mask the original exception that 
can happen
-        LOG.error("Unable to close log format reader", ioe);
-      }
-    }
-  }
-
-  private void scanInternalV2(Option<KeySpec> keySpecOption, boolean 
skipProcessingBlocks) {
+  protected final synchronized void scanInternal(Option<KeySpec> keySpecOpt, 
boolean skipProcessingBlocks) {
     currentInstantLogBlocks = new ArrayDeque<>();
     progress = 0.0f;
     totalLogFiles = new AtomicLong(0);
@@ -593,7 +433,7 @@ public abstract class AbstractHoodieLogRecordScanner {
       // merge the last read block when all the blocks are done reading
       if (!currentInstantLogBlocks.isEmpty() && !skipProcessingBlocks) {
         LOG.info("Merging the final data blocks");
-        processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size(), keySpecOption);
+        processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size(), keySpecOpt);
       }
       // Done
       progress = 1.0f;
@@ -893,10 +733,6 @@ public abstract class AbstractHoodieLogRecordScanner {
       throw new UnsupportedOperationException();
     }
 
-    public Builder withOptimizedLogBlocksScan(boolean 
enableOptimizedLogBlocksScan) {
-      throw new UnsupportedOperationException();
-    }
-
     public Builder withKeyFieldOverride(String keyFieldOverride) {
       throw new UnsupportedOperationException();
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
index 3b721c481a00..e242fb2bef50 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
@@ -127,8 +127,6 @@ public abstract class BaseHoodieLogRecordReader<T> {
   // Record type read from log block
   // Collect all the block instants after scanning all the log files.
   private final List<String> validBlockInstants = new ArrayList<>();
-  // Use scanV2 method.
-  private final boolean enableOptimizedLogBlocksScan;
   protected HoodieFileGroupRecordBuffer<T> recordBuffer;
   // Allows to consider inflight instants while merging log records
   protected boolean allowInflightInstants;
@@ -139,7 +137,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
                                       List<HoodieLogFile> logFiles,
                                       boolean reverseReader, int bufferSize, 
Option<InstantRange> instantRange,
                                       boolean withOperationField, boolean 
forceFullScan, Option<String> partitionNameOverride,
-                                      Option<String> keyFieldOverride, boolean 
enableOptimizedLogBlocksScan, HoodieFileGroupRecordBuffer<T> recordBuffer,
+                                      Option<String> keyFieldOverride, 
HoodieFileGroupRecordBuffer<T> recordBuffer,
                                       boolean allowInflightInstants) {
     this.readerContext = readerContext;
     this.readerSchema = readerContext.getSchemaHandler() != null ? 
readerContext.getSchemaHandler().getRequiredSchema() : null;
@@ -164,7 +162,6 @@ public abstract class BaseHoodieLogRecordReader<T> {
     this.withOperationField = withOperationField;
     this.forceFullScan = forceFullScan;
     this.internalSchema = readerContext.getSchemaHandler() != null ? 
readerContext.getSchemaHandler().getInternalSchema() : null;
-    this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan;
 
     if (keyFieldOverride.isPresent()) {
       // NOTE: This branch specifically is leveraged handling Metadata Table
@@ -192,180 +189,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
    * @param keySpecOpt           specifies target set of keys to be scanned
    * @param skipProcessingBlocks controls, whether (delta) blocks have to 
actually be processed
    */
-  protected final void scanInternal(Option<KeySpec> keySpecOpt, boolean 
skipProcessingBlocks) {
-    synchronized (this) {
-      if (enableOptimizedLogBlocksScan) {
-        scanInternalV2(keySpecOpt, skipProcessingBlocks);
-      } else {
-        scanInternalV1(keySpecOpt);
-      }
-    }
-  }
-
-  private void scanInternalV1(Option<KeySpec> keySpecOpt) {
-    currentInstantLogBlocks = new ArrayDeque<>();
-
-    progress = 0.0f;
-    totalLogFiles = new AtomicLong(0);
-    totalRollbacks = new AtomicLong(0);
-    totalCorruptBlocks = new AtomicLong(0);
-    totalLogBlocks = new AtomicLong(0);
-    totalLogRecords = new AtomicLong(0);
-    HoodieLogFormatReader logFormatReaderWrapper = null;
-    try {
-      // Iterate over the paths
-      logFormatReaderWrapper = new HoodieLogFormatReader(storage, logFiles,
-          readerSchema, reverseReader, bufferSize, shouldLookupRecords(), 
recordKeyField, internalSchema);
-
-      Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
-      while (logFormatReaderWrapper.hasNext()) {
-        HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
-        LOG.debug("Scanning log file {}", logFile);
-        scannedLogFiles.add(logFile);
-        totalLogFiles.set(scannedLogFiles.size());
-        // Use the HoodieLogFileReader to iterate through the blocks in the 
log file
-        HoodieLogBlock logBlock = logFormatReaderWrapper.next();
-        final String instantTime = 
logBlock.getLogBlockHeader().get(INSTANT_TIME);
-        totalLogBlocks.incrementAndGet();
-        if (logBlock.isDataOrDeleteBlock()) {
-          if (this.tableVersion.lesserThan(HoodieTableVersion.EIGHT) && 
!allowInflightInstants) {
-            HoodieTimeline commitsTimeline = 
this.hoodieTableMetaClient.getCommitsTimeline();
-            if (commitsTimeline.filterInflights().containsInstant(instantTime)
-                || 
!commitsTimeline.filterCompletedInstants().containsOrBeforeTimelineStarts(instantTime))
 {
-              // hit an uncommitted block possibly from a failed write, move 
to the next one and skip processing this one
-              continue;
-            }
-          }
-          if 
(compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), 
GREATER_THAN, this.latestInstantTime)) {
-            // Skip processing a data or delete block with the instant time 
greater than the latest instant time used by this log record reader
-            continue;
-          }
-          if (instantRange.isPresent() && 
!instantRange.get().isInRange(instantTime)) {
-            // filter the log block by instant range
-            continue;
-          }
-        }
-
-        switch (logBlock.getBlockType()) {
-          case HFILE_DATA_BLOCK:
-          case AVRO_DATA_BLOCK:
-          case PARQUET_DATA_BLOCK:
-            LOG.debug("Reading a data block from file {} at instant {}", 
logFile.getPath(), instantTime);
-            // store the current block
-            currentInstantLogBlocks.push(logBlock);
-            break;
-          case DELETE_BLOCK:
-            LOG.debug("Reading a delete block from file {}", 
logFile.getPath());
-            // store deletes so can be rolled back
-            currentInstantLogBlocks.push(logBlock);
-            break;
-          case COMMAND_BLOCK:
-            // Consider the following scenario
-            // (Time 0, C1, Task T1) -> Running
-            // (Time 1, C1, Task T1) -> Failed (Wrote either a corrupt block 
or a correct
-            // DataBlock (B1) with commitTime C1
-            // (Time 2, C1, Task T1.2) -> Running (Task T1 was retried and the 
attempt number is 2)
-            // (Time 3, C1, Task T1.2) -> Finished (Wrote a correct DataBlock 
B2)
-            // Now a logFile L1 can have 2 correct Datablocks (B1 and B2) 
which are the same.
-            // Say, commit C1 eventually failed and a rollback is triggered.
-            // Rollback will write only 1 rollback block (R1) since it assumes 
one block is
-            // written per ingestion batch for a file but in reality we need 
to rollback (B1 & B2)
-            // The following code ensures the same rollback block (R1) is used 
to rollback
-            // both B1 & B2
-            // This is a command block - take appropriate action based on the 
command
-            HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock;
-            String targetInstantForCommandBlock =
-                
logBlock.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME);
-            LOG.debug("Reading a command block {} with targetInstantTime {} 
from file {}", commandBlock.getType(), targetInstantForCommandBlock,
-                logFile.getPath());
-            switch (commandBlock.getType()) { // there can be different types 
of command blocks
-              case ROLLBACK_BLOCK:
-                // Rollback older read log block(s)
-                // Get commit time from older record blocks, compare with 
targetCommitTime,
-                // rollback only if equal, this is required in scenarios of 
invalid/extra
-                // rollback blocks written due to failures during the rollback 
operation itself
-                // and ensures the same rollback block (R1) is used to 
rollback both B1 & B2 with
-                // same instant_time.
-                final int instantLogBlockSizeBeforeRollback = 
currentInstantLogBlocks.size();
-                currentInstantLogBlocks.removeIf(block -> {
-                  // handle corrupt blocks separately since they may not have 
metadata
-                  if (block.getBlockType() == CORRUPT_BLOCK) {
-                    LOG.debug("Rolling back the last corrupted log block read 
in {}", logFile.getPath());
-                    return true;
-                  }
-                  if 
(targetInstantForCommandBlock.contentEquals(block.getLogBlockHeader().get(INSTANT_TIME)))
 {
-                    // rollback older data block or delete block
-                    LOG.debug("Rolling back an older log block read from {} 
with instantTime {}",
-                        logFile.getPath(), targetInstantForCommandBlock);
-                    return true;
-                  }
-                  return false;
-                });
-
-                final int numBlocksRolledBack = 
instantLogBlockSizeBeforeRollback - currentInstantLogBlocks.size();
-                totalRollbacks.addAndGet(numBlocksRolledBack);
-                LOG.debug("Number of applied rollback blocks {}", 
numBlocksRolledBack);
-                if (numBlocksRolledBack == 0) {
-                  LOG.warn("TargetInstantTime {} invalid or extra rollback 
command block in {}",
-                      targetInstantForCommandBlock, logFile.getPath());
-                }
-                break;
-              default:
-                throw new UnsupportedOperationException("Command type not yet 
supported.");
-            }
-            break;
-          case CORRUPT_BLOCK:
-            LOG.debug("Found a corrupt block in {}", logFile.getPath());
-            totalCorruptBlocks.incrementAndGet();
-            // If there is a corrupt block - we will assume that this was the 
next data block
-            currentInstantLogBlocks.push(logBlock);
-            break;
-          default:
-            throw new UnsupportedOperationException("Block type not supported 
yet");
-        }
-      }
-      // merge the last read block when all the blocks are done reading
-      if (!currentInstantLogBlocks.isEmpty()) {
-        LOG.debug("Merging the final data blocks");
-        processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size(), keySpecOpt);
-      }
-
-      // Done
-      progress = 1.0f;
-      totalLogRecords.set(recordBuffer.getTotalLogRecords());
-    } catch (IOException e) {
-      LOG.error("Got IOException when reading log file", e);
-      throw new HoodieIOException("IOException when reading log file ", e);
-    } catch (Exception e) {
-      LOG.error("Got exception when reading log file", e);
-      throw new HoodieException("Exception when reading log file ", e);
-    } finally {
-      try {
-        if (null != logFormatReaderWrapper) {
-          logFormatReaderWrapper.close();
-        }
-      } catch (IOException ioe) {
-        // Eat exception as we do not want to mask the original exception that 
can happen
-        LOG.error("Unable to close log format reader", ioe);
-      }
-      if (!logFiles.isEmpty()) {
-        try {
-          StoragePath path = logFiles.get(0).getPath();
-          LOG.info("Finished scanning log files. FileId: {}, 
LogFileInstantTime: {}, "
-                  + "Total log files: {}, Total log blocks: {}, Total 
rollbacks: {}, Total corrupt blocks: {}",
-              FSUtils.getFileIdFromLogPath(path), 
FSUtils.getDeltaCommitTimeFromLogPath(path),
-              totalLogFiles.get(), totalLogBlocks.get(), totalRollbacks.get(), 
totalCorruptBlocks.get());
-        } catch (Exception e) {
-          LOG.warn("Could not extract fileId from log path", e);
-          LOG.info("Finished scanning log files. "
-                  + "Total log files: {}, Total log blocks: {}, Total 
rollbacks: {}, Total corrupt blocks: {}",
-              totalLogFiles.get(), totalLogBlocks.get(), totalRollbacks.get(), 
totalCorruptBlocks.get());
-        }
-      }
-    }
-  }
-
-  private void scanInternalV2(Option<KeySpec> keySpecOption, boolean 
skipProcessingBlocks) {
+  protected final synchronized void scanInternal(Option<KeySpec> keySpecOpt, 
boolean skipProcessingBlocks) {
     currentInstantLogBlocks = new ArrayDeque<>();
     progress = 0.0f;
     totalLogFiles = new AtomicLong(0);
@@ -558,7 +382,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
       // merge the last read block when all the blocks are done reading
       if (!currentInstantLogBlocks.isEmpty() && !skipProcessingBlocks) {
         LOG.debug("Merging the final data blocks");
-        processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size(), keySpecOption);
+        processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size(), keySpecOpt);
       }
       // Done
       progress = 1.0f;
@@ -709,10 +533,6 @@ public abstract class BaseHoodieLogRecordReader<T> {
       throw new UnsupportedOperationException();
     }
 
-    public Builder withOptimizedLogBlocksScan(boolean 
enableOptimizedLogBlocksScan) {
-      throw new UnsupportedOperationException();
-    }
-
     public abstract BaseHoodieLogRecordReader build();
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogBlockMetadataScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogBlockMetadataScanner.java
index 23d64eed88cb..bfc387be1d24 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogBlockMetadataScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogBlockMetadataScanner.java
@@ -33,9 +33,11 @@ import java.util.List;
  */
 public class HoodieLogBlockMetadataScanner extends 
BaseHoodieLogRecordReader<IndexedRecord> {
 
-  public HoodieLogBlockMetadataScanner(HoodieTableMetaClient metaClient, 
List<HoodieLogFile> logFiles, int bufferSize, String maxInstantTime, 
Option<InstantRange> instantRange) {
-    super(getReaderContext(metaClient, maxInstantTime), metaClient, 
metaClient.getStorage(), logFiles, false, bufferSize, instantRange, false, 
false, Option.empty(), Option.empty(), true,
-        null, false);
+  public HoodieLogBlockMetadataScanner(HoodieTableMetaClient metaClient, 
List<HoodieLogFile> logFiles,
+                                       int bufferSize, String maxInstantTime, 
Option<InstantRange> instantRange) {
+    super(getReaderContext(metaClient, maxInstantTime), metaClient, 
metaClient.getStorage(), logFiles,
+        false, bufferSize, instantRange, false, false, Option.empty(),
+        Option.empty(), null, false);
     scanInternal(Option.empty(), true);
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
index 4ca30827fb8b..b3f1eeaa988e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
@@ -66,10 +66,10 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
   private HoodieMergedLogRecordReader(HoodieReaderContext<T> readerContext, 
HoodieTableMetaClient metaClient, HoodieStorage storage,
                                       List<HoodieLogFile> logFiles, boolean 
reverseReader,
                                       int bufferSize, Option<InstantRange> 
instantRange, boolean withOperationField, boolean forceFullScan,
-                                      Option<String> partitionName, 
Option<String> keyFieldOverride, boolean enableOptimizedLogBlocksScan,
+                                      Option<String> partitionName, 
Option<String> keyFieldOverride,
                                       HoodieFileGroupRecordBuffer<T> 
recordBuffer, boolean allowInflightInstants) {
     super(readerContext, metaClient, storage, logFiles, reverseReader, 
bufferSize, instantRange, withOperationField,
-        forceFullScan, partitionName, keyFieldOverride, 
enableOptimizedLogBlocksScan, recordBuffer, allowInflightInstants);
+        forceFullScan, partitionName, keyFieldOverride, recordBuffer, 
allowInflightInstants);
 
     if (forceFullScan) {
       performScan();
@@ -228,12 +228,6 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
       return this;
     }
 
-    @Override
-    public Builder<T> withOptimizedLogBlocksScan(boolean 
enableOptimizedLogBlocksScan) {
-      this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan;
-      return this;
-    }
-
     public Builder<T> withKeyFieldOverride(String keyFieldOverride) {
       this.keyFieldOverride = Objects.requireNonNull(keyFieldOverride);
       return this;
@@ -274,7 +268,7 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
           withOperationField, forceFullScan,
           Option.ofNullable(partitionName),
           Option.ofNullable(keyFieldOverride),
-          enableOptimizedLogBlocksScan, recordBuffer,
+          recordBuffer,
           allowInflightInstants);
     }
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index f6b2a7b51854..151ae92c0ed4 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -110,11 +110,11 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordScanner
                                          Option<String> partitionName,
                                          InternalSchema internalSchema,
                                          Option<String> keyFieldOverride,
-                                         boolean enableOptimizedLogBlocksScan, 
HoodieRecordMerger recordMerger,
+                                         HoodieRecordMerger recordMerger,
                                          Option<HoodieTableMetaClient> 
hoodieTableMetaClientOption,
                                          boolean allowInflightInstants) {
     super(storage, basePath, logFilePaths, readerSchema, latestInstantTime, 
reverseReader, bufferSize,
-        instantRange, withOperationField, forceFullScan, partitionName, 
internalSchema, keyFieldOverride, enableOptimizedLogBlocksScan, recordMerger,
+        instantRange, withOperationField, forceFullScan, partitionName, 
internalSchema, keyFieldOverride, recordMerger,
         hoodieTableMetaClientOption);
     try {
       this.maxMemorySizeInBytes = maxMemorySizeInBytes;
@@ -359,7 +359,6 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordScanner
     private String keyFieldOverride;
     // By default, we're doing a full-scan
     private boolean forceFullScan = true;
-    private boolean enableOptimizedLogBlocksScan = false;
     protected boolean allowInflightInstants = false;
     private HoodieRecordMerger recordMerger = new 
HoodiePreCombineAvroRecordMerger();
     protected HoodieTableMetaClient hoodieTableMetaClient;
@@ -457,12 +456,6 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordScanner
       return this;
     }
 
-    @Override
-    public Builder withOptimizedLogBlocksScan(boolean 
enableOptimizedLogBlocksScan) {
-      this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan;
-      return this;
-    }
-
     @Override
     public Builder withRecordMerger(HoodieRecordMerger recordMerger) {
       this.recordMerger = 
HoodieRecordUtils.mergerToPreCombineMode(recordMerger);
@@ -502,7 +495,7 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordScanner
           latestInstantTime, maxMemorySizeInBytes, reverseReader,
           bufferSize, spillableMapBasePath, instantRange,
           diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField, 
forceFullScan,
-          Option.ofNullable(partitionName), internalSchema, 
Option.ofNullable(keyFieldOverride), enableOptimizedLogBlocksScan, recordMerger,
+          Option.ofNullable(partitionName), internalSchema, 
Option.ofNullable(keyFieldOverride), recordMerger,
           Option.ofNullable(hoodieTableMetaClient), allowInflightInstants);
     }
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
index 941d11ab94b2..ddf451f09873 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
@@ -48,10 +48,9 @@ public class HoodieUnMergedLogRecordScanner extends 
AbstractHoodieLogRecordScann
                                          String latestInstantTime, boolean 
reverseReader, int bufferSize,
                                          LogRecordScannerCallback callback, 
RecordDeletionCallback recordDeletionCallback,
                                          Option<InstantRange> instantRange, 
InternalSchema internalSchema,
-                                         boolean enableOptimizedLogBlocksScan, 
HoodieRecordMerger recordMerger,
-                                         Option<HoodieTableMetaClient> 
hoodieTableMetaClientOption) {
+                                         HoodieRecordMerger recordMerger, 
Option<HoodieTableMetaClient> hoodieTableMetaClientOption) {
     super(storage, basePath, logFilePaths, readerSchema, latestInstantTime, 
reverseReader, bufferSize, instantRange,
-        false, true, Option.empty(), internalSchema, Option.empty(), 
enableOptimizedLogBlocksScan, recordMerger,
+        false, true, Option.empty(), internalSchema, Option.empty(), 
recordMerger,
          hoodieTableMetaClientOption);
     this.callback = callback;
     this.recordDeletionCallback = recordDeletionCallback;
@@ -126,7 +125,6 @@ public class HoodieUnMergedLogRecordScanner extends 
AbstractHoodieLogRecordScann
     // specific configurations
     private LogRecordScannerCallback callback;
     private RecordDeletionCallback recordDeletionCallback;
-    private boolean enableOptimizedLogBlocksScan;
     private HoodieRecordMerger recordMerger = new 
HoodiePreCombineAvroRecordMerger();
     private HoodieTableMetaClient hoodieTableMetaClient;
 
@@ -194,12 +192,6 @@ public class HoodieUnMergedLogRecordScanner extends 
AbstractHoodieLogRecordScann
       return this;
     }
 
-    @Override
-    public Builder withOptimizedLogBlocksScan(boolean 
enableOptimizedLogBlocksScan) {
-      this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan;
-      return this;
-    }
-
     @Override
     public Builder withRecordMerger(HoodieRecordMerger recordMerger) {
       this.recordMerger = 
HoodieRecordUtils.mergerToPreCombineMode(recordMerger);
@@ -219,7 +211,7 @@ public class HoodieUnMergedLogRecordScanner extends 
AbstractHoodieLogRecordScann
 
       return new HoodieUnMergedLogRecordScanner(storage, basePath, 
logFilePaths, readerSchema,
           latestInstantTime, reverseReader, bufferSize, callback, 
recordDeletionCallback, instantRange,
-          internalSchema, enableOptimizedLogBlocksScan, recordMerger, 
Option.ofNullable(hoodieTableMetaClient));
+          internalSchema, recordMerger, 
Option.ofNullable(hoodieTableMetaClient));
     }
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index d579f3120744..2b35eaff4f30 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -380,7 +380,6 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     private boolean allowInflightInstants = false;
     private boolean emitDelete;
     private boolean sortOutput = false;
-    private Boolean enableOptimizedLogBlockScan = false;
     private Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback = 
Option.empty();
     private FileGroupRecordBufferLoader<T> recordBufferLoader;
 
@@ -478,11 +477,6 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
       return this;
     }
 
-    public Builder<T> withEnableOptimizedLogBlockScan(boolean 
enableOptimizedLogBlockScan) {
-      this.enableOptimizedLogBlockScan = enableOptimizedLogBlockScan;
-      return this;
-    }
-
     /**
      * If true, the output of the merge will be sorted instead of appending 
log records to end of the iterator if they do not have matching keys in the 
base file.
      * This assumes that the base file is already sorted by key.
@@ -513,11 +507,6 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
       ValidationUtils.checkArgument(props != null, "Props is required");
       ValidationUtils.checkArgument(baseFileOption != null, "Base file option 
is required");
       ValidationUtils.checkArgument(partitionPath != null, "Partition path is 
required");
-      if (enableOptimizedLogBlockScan == null) {
-        // check to see if props contains this key if not explicitly set
-        // otherwise use the default value from the config itself
-        enableOptimizedLogBlockScan = 
Boolean.valueOf(ConfigUtils.getRawValueWithAltKeys(props, 
HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN, true));
-      }
 
       if (recordBufferLoader == null) {
         recordBufferLoader = FileGroupRecordBufferLoader.createDefault();
@@ -528,7 +517,6 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
           .emitDeletes(emitDelete)
           .sortOutputs(sortOutput)
           .allowInflightInstants(allowInflightInstants)
-          .enableOptimizedLogBlockScan(enableOptimizedLogBlockScan)
           .build();
       InputSplit inputSplit = new InputSplit(baseFileOption, recordIterator != 
null ? Either.right(recordIterator) : Either.left(logFiles == null ? 
Stream.empty() : logFiles),
           partitionPath, start, length);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/ReaderParameters.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/ReaderParameters.java
index a12e8b3c28fd..bfb791fe5fa7 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/ReaderParameters.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/ReaderParameters.java
@@ -35,14 +35,12 @@ public class ReaderParameters {
   // the allowInflightInstants flag would need to be set to true. This would 
ensure the HoodieMergedLogRecordReader
   // considers the log records which are inflight.
   private final boolean allowInflightInstants;
-  private final boolean enableOptimizedLogBlockScan;
 
-  private ReaderParameters(boolean useRecordPosition, boolean emitDelete, 
boolean sortOutput, boolean allowInflightInstants, boolean 
enableOptimizedLogBlockScan) {
+  private ReaderParameters(boolean useRecordPosition, boolean emitDelete, 
boolean sortOutput, boolean allowInflightInstants) {
     this.useRecordPosition = useRecordPosition;
     this.emitDelete = emitDelete;
     this.sortOutput = sortOutput;
     this.allowInflightInstants = allowInflightInstants;
-    this.enableOptimizedLogBlockScan = enableOptimizedLogBlockScan;
   }
 
   public boolean useRecordPosition() {
@@ -61,10 +59,6 @@ public class ReaderParameters {
     return allowInflightInstants;
   }
 
-  public boolean enableOptimizedLogBlockScan() {
-    return enableOptimizedLogBlockScan;
-  }
-
   static Builder builder() {
     return new Builder();
   }
@@ -74,7 +68,6 @@ public class ReaderParameters {
     private boolean emitDelete = false;
     private boolean sortOutput = false;
     private boolean allowInflightInstants = false;
-    private boolean enableOptimizedLogBlockScan = false;
 
     public Builder shouldUseRecordPosition(boolean shouldUseRecordPosition) {
       this.shouldUseRecordPosition = shouldUseRecordPosition;
@@ -96,13 +89,8 @@ public class ReaderParameters {
       return this;
     }
 
-    public Builder enableOptimizedLogBlockScan(boolean 
enableOptimizedLogBlockScan) {
-      this.enableOptimizedLogBlockScan = enableOptimizedLogBlockScan;
-      return this;
-    }
-
     public ReaderParameters build() {
-      return new ReaderParameters(shouldUseRecordPosition, emitDelete, 
sortOutput, allowInflightInstants, enableOptimizedLogBlockScan);
+      return new ReaderParameters(shouldUseRecordPosition, emitDelete, 
sortOutput, allowInflightInstants);
     }
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/LogScanningRecordBufferLoader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/LogScanningRecordBufferLoader.java
index 4614b25c2049..6cbaf189ce3b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/LogScanningRecordBufferLoader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/LogScanningRecordBufferLoader.java
@@ -49,7 +49,6 @@ abstract class LogScanningRecordBufferLoader {
         .withRecordBuffer(recordBuffer)
         .withAllowInflightInstants(readerParameters.allowInflightInstants())
         .withMetaClient(hoodieTableMetaClient)
-        
.withOptimizedLogBlocksScan(readerParameters.enableOptimizedLogBlockScan())
         .build()) {
       
readStats.setTotalLogReadTimeMs(logRecordReader.getTotalTimeTakenToReadAndMergeBlocks());
       
readStats.setTotalUpdatedRecordsCompacted(logRecordReader.getNumMergedRecordsInLog());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 61c34915dcc2..4f0fe0bc7e40 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -548,7 +548,6 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
         .withRequestedSchema(SCHEMA)
         .withProps(fileGroupReaderProps)
         .withRecordBufferLoader(recordBufferLoader)
-        
.withEnableOptimizedLogBlockScan(metadataConfig.isOptimizedLogBlocksScanEnabled())
         .build();
 
     return fileGroupReader.getClosableIterator();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index f4f2ad9cbf6b..a6b267d19c4c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -868,8 +868,7 @@ public class HoodieTableMetadataUtil {
                                                                                
  HoodieTableMetaClient dataTableMetaClient,
                                                                                
  int writesFileIdEncoding,
                                                                                
  String instantTime,
-                                                                               
  EngineType engineType,
-                                                                               
  boolean enableOptimizeLogBlocksScan) {
+                                                                               
  EngineType engineType) {
     List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
         .flatMap(Collection::stream).collect(Collectors.toList());
     // Return early if there are no write stats, or if the operation is a 
compaction.
@@ -934,7 +933,7 @@ public class HoodieTableMetadataUtil {
                   .collect(Collectors.toList());
               // Extract revived and deleted keys
               Pair<Set<String>, Set<String>> revivedAndDeletedKeys = 
getRevivedAndDeletedKeysFromMergedLogs(dataTableMetaClient, instantTime, 
allLogFilePaths, finalWriterSchemaOpt,
-                  currentLogFilePaths, partitionPath, 
readerContextFactory.getContext(), enableOptimizeLogBlocksScan);
+                  currentLogFilePaths, partitionPath, 
readerContextFactory.getContext());
               Set<String> revivedKeys = revivedAndDeletedKeys.getLeft();
               Set<String> deletedKeys = revivedAndDeletedKeys.getRight();
               // Process revived keys to create updates
@@ -989,7 +988,6 @@ public class HoodieTableMetadataUtil {
    * @param logFilePaths         list of log file paths including current and 
previous file slices
    * @param finalWriterSchemaOpt records schema
    * @param currentLogFilePaths  list of log file paths for the current instant
-   * @param enableOptimizedLogBlocksScan - flag used to enable scanInternalV2 
for log blocks in data table
    * @return pair of revived and deleted keys
    */
   @VisibleForTesting
@@ -999,8 +997,7 @@ public class HoodieTableMetadataUtil {
                                                                                
           Option<HoodieSchema> finalWriterSchemaOpt,
                                                                                
           List<String> currentLogFilePaths,
                                                                                
           String partitionPath,
-                                                                               
           HoodieReaderContext<T> readerContext,
-                                                                               
           boolean enableOptimizedLogBlocksScan) {
+                                                                               
           HoodieReaderContext<T> readerContext) {
     // Separate out the current log files
     List<String> logFilePathsWithoutCurrentLogFiles = logFilePaths.stream()
         .filter(logFilePath -> !currentLogFilePaths.contains(logFilePath))
@@ -1008,7 +1005,7 @@ public class HoodieTableMetadataUtil {
     if (logFilePathsWithoutCurrentLogFiles.isEmpty()) {
       // Only current log file is present, so we can directly get the deleted 
record keys from it and return the RLI records.
       try (ClosableIterator<BufferedRecord<T>> currentLogRecords =
-               getLogRecords(currentLogFilePaths, dataTableMetaClient, 
finalWriterSchemaOpt, instantTime, partitionPath, readerContext, 
enableOptimizedLogBlocksScan)) {
+               getLogRecords(currentLogFilePaths, dataTableMetaClient, 
finalWriterSchemaOpt, instantTime, partitionPath, readerContext)) {
         Set<String> deletedKeys = new HashSet<>();
         currentLogRecords.forEachRemaining(record -> {
           if (record.isDelete()) {
@@ -1019,7 +1016,7 @@ public class HoodieTableMetadataUtil {
       }
     }
     return getRevivedAndDeletedKeys(dataTableMetaClient, instantTime, 
partitionPath, readerContext,
-            logFilePaths, finalWriterSchemaOpt, 
logFilePathsWithoutCurrentLogFiles, enableOptimizedLogBlocksScan);
+            logFilePaths, finalWriterSchemaOpt, 
logFilePathsWithoutCurrentLogFiles);
   }
 
   private static <T> Pair<Set<String>, Set<String>> 
getRevivedAndDeletedKeys(HoodieTableMetaClient dataTableMetaClient,
@@ -1028,14 +1025,13 @@ public class HoodieTableMetadataUtil {
                                                                              
HoodieReaderContext<T> readerContext,
                                                                              
List<String> logFilePaths,
                                                                              
Option<HoodieSchema> finalWriterSchemaOpt,
-                                                                             
List<String> logFilePathsWithoutCurrentLogFiles,
-                                                                             
boolean enableOptimizedLogBlocksScan) {
+                                                                             
List<String> logFilePathsWithoutCurrentLogFiles) {
     // Partition valid (non-deleted) and deleted keys from all log files, 
including current, in a single pass
     Set<String> validKeysForAllLogs = new HashSet<>();
     Set<String> deletedKeysForAllLogs = new HashSet<>();
     // Fetch log records for all log files
     try (ClosableIterator<BufferedRecord<T>> allLogRecords =
-             getLogRecords(logFilePaths, dataTableMetaClient, 
finalWriterSchemaOpt, instantTime, partitionPath, readerContext, 
enableOptimizedLogBlocksScan)) {
+             getLogRecords(logFilePaths, dataTableMetaClient, 
finalWriterSchemaOpt, instantTime, partitionPath, readerContext)) {
       allLogRecords.forEachRemaining(record -> {
         if (record.isDelete()) {
           deletedKeysForAllLogs.add(record.getRecordKey());
@@ -1050,7 +1046,7 @@ public class HoodieTableMetadataUtil {
     Set<String> deletedKeysForPreviousLogs = new HashSet<>();
     // Fetch log records for previous log files (excluding the current log 
files)
     try (ClosableIterator<BufferedRecord<T>> previousLogRecords =
-             getLogRecords(logFilePathsWithoutCurrentLogFiles, 
dataTableMetaClient, finalWriterSchemaOpt, instantTime, partitionPath, 
readerContext, enableOptimizedLogBlocksScan)) {
+             getLogRecords(logFilePathsWithoutCurrentLogFiles, 
dataTableMetaClient, finalWriterSchemaOpt, instantTime, partitionPath, 
readerContext)) {
       previousLogRecords.forEachRemaining(record -> {
         if (record.isDelete()) {
           deletedKeysForPreviousLogs.add(record.getRecordKey());
@@ -1068,8 +1064,7 @@ public class HoodieTableMetadataUtil {
                                                                        
Option<HoodieSchema> writerSchemaOpt,
                                                                        String 
latestCommitTimestamp,
                                                                        String 
partitionPath,
-                                                                       
HoodieReaderContext<T> readerContext,
-                                                                       boolean 
enableOptimizedLogBlocksScan) {
+                                                                       
HoodieReaderContext<T> readerContext) {
     if (writerSchemaOpt.isPresent() && !logFilePaths.isEmpty()) {
       List<HoodieLogFile> logFiles = 
logFilePaths.stream().map(HoodieLogFile::new).collect(Collectors.toList());
       FileSlice fileSlice = new FileSlice(partitionPath, 
logFiles.get(0).getFileId(), logFiles.get(0).getDeltaCommitTime());
@@ -1100,7 +1095,6 @@ public class HoodieTableMetadataUtil {
           .withMetaClient(datasetMetaClient)
           .withAllowInflightInstants(true)
           .withRecordBuffer(recordBuffer)
-          .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
           .build()) {
         // initializes the record buffer with the log records
         return recordBuffer.getLogRecordIterator();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index 8896945e21f5..797591426d6c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -142,7 +142,6 @@ public class FormatUtils {
         .withProps(typedProps)
         .withShouldUseRecordPosition(false)
         .withEmitDelete(emitDelete)
-        
.withEnableOptimizedLogBlockScan(writeConfig.enableOptimizedLogBlocksScan())
         .build();
   }
 
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 753f07350230..cde861f8c5f2 100755
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -696,27 +696,24 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     reader.close();
   }
 
-  @ParameterizedTest
-  @ValueSource(booleans = {true, false})
-  public void testFilteringUncommittedLogBlocksPreTableVersion8(boolean 
enableOptimizedLogBlocksScan) throws Exception {
+  @Test
+  public void testFilteringUncommittedLogBlocksPreTableVersion8() throws 
Exception {
     HoodieTableMetaClient metaClient = 
HoodieTestUtils.createMetaClient(basePath);
     metaClient.getTableConfig().setTableVersion(HoodieTableVersion.SIX);
     HoodieTableConfig.update(metaClient.getStorage(), 
metaClient.getMetaPath(), metaClient.getTableConfig().getProps());
-    
testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType.ROCKS_DB, 
true, enableOptimizedLogBlocksScan, true, true);
+    
testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType.ROCKS_DB, 
true, true, true);
   }
 
   @ParameterizedTest
   @MethodSource("testArguments")
   public void 
testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType 
diskMapType,
-                                                  boolean isCompressionEnabled,
-                                                  boolean 
enableOptimizedLogBlocksScan)
+                                                  boolean isCompressionEnabled)
       throws IOException, URISyntaxException, InterruptedException {
-    testBasicAppendAndScanMultipleFiles(diskMapType, isCompressionEnabled, 
enableOptimizedLogBlocksScan, false, false);
+    testBasicAppendAndScanMultipleFiles(diskMapType, isCompressionEnabled, 
false, false);
   }
 
   private void 
testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType 
diskMapType,
                                                    boolean 
isCompressionEnabled,
-                                                   boolean 
enableOptimizedLogBlocksScan,
                                                    boolean 
produceUncommittedLogBlocks,
                                                    boolean preTableVersion8)
       throws IOException, URISyntaxException, InterruptedException {
@@ -750,7 +747,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .withSpillableMapBasePath(spillableBasePath)
         .withDiskMapType(diskMapType)
         .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
-        .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
         .build();
 
     List<IndexedRecord> scannedRecords = new ArrayList<>();
@@ -773,8 +769,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @ParameterizedTest
   @MethodSource("testArguments")
   public void 
testBasicAppendAndPartialScanning(ExternalSpillableMap.DiskMapType diskMapType,
-                                                boolean isCompressionEnabled,
-                                                boolean 
enableOptimizedLogBlocksScan)
+                                                boolean isCompressionEnabled)
       throws IOException, URISyntaxException, InterruptedException {
     // Generate 3 delta-log files w/ random records
     HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
@@ -800,7 +795,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .withSpillableMapBasePath(spillableBasePath)
         .withDiskMapType(diskMapType)
         .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
-        .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
         .withForceFullScan(false)
         .build();
 
@@ -861,8 +855,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @ParameterizedTest
   @MethodSource("testArguments")
   public void 
testBasicAppendAndPartialScanningByKeyPrefixes(ExternalSpillableMap.DiskMapType 
diskMapType,
-                                                             boolean 
isCompressionEnabled,
-                                                             boolean 
enableOptimizedLogBlocksScan)
+                                                             boolean 
isCompressionEnabled)
       throws IOException, URISyntaxException, InterruptedException {
     // Generate 3 delta-log files w/ random records
     HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
@@ -888,7 +881,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .withSpillableMapBasePath(spillableBasePath)
         .withDiskMapType(diskMapType)
         .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
-        .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
         .withForceFullScan(false)
         .build();
 
@@ -1154,8 +1146,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @ParameterizedTest
   @MethodSource("testArguments")
   public void testAvroLogRecordReaderBasic(ExternalSpillableMap.DiskMapType 
diskMapType,
-                                           boolean isCompressionEnabled,
-                                           boolean 
enableOptimizedLogBlocksScan)
+                                           boolean isCompressionEnabled)
       throws IOException, URISyntaxException, InterruptedException {
     HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
@@ -1192,15 +1183,14 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     Set<String> originalKeys =
         copyOfRecords1.stream().map(s -> ((GenericRecord) 
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
             .collect(Collectors.toSet());
-    checkLogBlocksAndKeys("100", schema, diskMapType, isCompressionEnabled, 
enableOptimizedLogBlocksScan,
+    checkLogBlocksAndKeys("100", schema, diskMapType, isCompressionEnabled,
         200, 200, Option.of(originalKeys));
   }
 
   @ParameterizedTest
   @MethodSource("testArguments")
   public void 
testAvroLogRecordReaderWithRollbackTombstone(ExternalSpillableMap.DiskMapType 
diskMapType,
-                                                           boolean 
isCompressionEnabled,
-                                                           boolean 
enableOptimizedLogBlocksScan)
+                                                           boolean 
isCompressionEnabled)
       throws IOException, URISyntaxException, InterruptedException {
     HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
@@ -1256,15 +1246,14 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     Set<String> originalKeys =
         copyOfRecords1.stream().map(s -> ((GenericRecord) 
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
             .collect(Collectors.toSet());
-    checkLogBlocksAndKeys("102", schema, diskMapType, isCompressionEnabled, 
enableOptimizedLogBlocksScan,
+    checkLogBlocksAndKeys("102", schema, diskMapType, isCompressionEnabled,
         200, 200, Option.of(originalKeys));
   }
 
   @ParameterizedTest
   @MethodSource("testArguments")
   public void 
testAvroLogRecordReaderWithFailedPartialBlock(ExternalSpillableMap.DiskMapType 
diskMapType,
-                                                            boolean 
isCompressionEnabled,
-                                                            boolean 
enableOptimizedLogBlocksScan)
+                                                            boolean 
isCompressionEnabled)
       throws IOException, URISyntaxException, InterruptedException {
     HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
@@ -1327,15 +1316,14 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     Set<String> originalKeys =
         copyOfRecords1.stream().map(s -> ((GenericRecord) 
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
             .collect(Collectors.toSet());
-    checkLogBlocksAndKeys("103", schema, diskMapType, isCompressionEnabled, 
enableOptimizedLogBlocksScan,
+    checkLogBlocksAndKeys("103", schema, diskMapType, isCompressionEnabled,
         200, 200, Option.of(originalKeys));
   }
 
   @ParameterizedTest
   @MethodSource("testArguments")
   public void 
testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.DiskMapType 
diskMapType,
-                                                           boolean 
isCompressionEnabled,
-                                                           boolean 
enableOptimizedLogBlocksScan)
+                                                           boolean 
isCompressionEnabled)
       throws IOException, URISyntaxException, InterruptedException {
     HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
@@ -1402,7 +1390,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .withSpillableMapBasePath(spillableBasePath)
         .withDiskMapType(diskMapType)
         .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
-        .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
         .build();
 
     assertEquals(200, scanner.getTotalLogRecords(), "We still would read 200 
records");
@@ -1449,7 +1436,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .withSpillableMapBasePath(spillableBasePath)
         .withDiskMapType(diskMapType)
         .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
-        .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
         .build();
     scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
     final List<Boolean> newEmptyPayloads = new ArrayList<>();
@@ -1477,8 +1463,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @ParameterizedTest
   @MethodSource("testArguments")
   public void 
testAvroLogRecordReaderWithCommitBeforeAndAfterRollback(ExternalSpillableMap.DiskMapType
 diskMapType,
-                                                                      boolean 
isCompressionEnabled,
-                                                                      boolean 
enableOptimizedLogBlocksScan)
+                                                                      boolean 
isCompressionEnabled)
       throws IOException, URISyntaxException, InterruptedException {
     HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
@@ -1561,7 +1546,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .withSpillableMapBasePath(spillableBasePath)
         .withDiskMapType(diskMapType)
         .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
-        .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
         .build();
     scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
     final List<Boolean> newEmptyPayloads = new ArrayList<>();
@@ -1719,8 +1703,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @ParameterizedTest
   @MethodSource("testArguments")
   public void 
testAvroLogRecordReaderWithFailedRollbacks(ExternalSpillableMap.DiskMapType 
diskMapType,
-                                                         boolean 
isCompressionEnabled,
-                                                         boolean 
enableOptimizedLogBlocksScan)
+                                                         boolean 
isCompressionEnabled)
       throws IOException, URISyntaxException, InterruptedException {
 
     // Write a Data block and Delete block with same InstantTime (written in 
same batch)
@@ -1781,7 +1764,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     writer.appendBlock(commandBlock);
     writer.close();
 
-    checkLogBlocksAndKeys("100", schema, diskMapType, isCompressionEnabled, 
enableOptimizedLogBlocksScan,
+    checkLogBlocksAndKeys("100", schema, diskMapType, isCompressionEnabled,
         0, 0, Option.empty());
     FileCreateUtilsLegacy.deleteDeltaCommit(basePath, "100", storage);
   }
@@ -1789,8 +1772,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @ParameterizedTest
   @MethodSource("testArguments")
   public void 
testAvroLogRecordReaderWithInsertDeleteAndRollback(ExternalSpillableMap.DiskMapType
 diskMapType,
-                                                                 boolean 
isCompressionEnabled,
-                                                                 boolean 
enableOptimizedLogBlocksScan)
+                                                                 boolean 
isCompressionEnabled)
       throws IOException, URISyntaxException, InterruptedException {
 
     // Write a Data block and Delete block with same InstantTime (written in 
same batch)
@@ -1836,7 +1818,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     writer.appendBlock(commandBlock);
     writer.close();
 
-    checkLogBlocksAndKeys("100", schema, diskMapType, isCompressionEnabled, 
enableOptimizedLogBlocksScan,
+    checkLogBlocksAndKeys("100", schema, diskMapType, isCompressionEnabled,
         0, 0, Option.empty());
     FileCreateUtilsLegacy.deleteDeltaCommit(basePath, "100", storage);
   }
@@ -1844,8 +1826,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @ParameterizedTest
   @MethodSource("testArguments")
   public void 
testAvroLogRecordReaderWithInvalidRollback(ExternalSpillableMap.DiskMapType 
diskMapType,
-                                                         boolean 
isCompressionEnabled,
-                                                         boolean 
enableOptimizedLogBlocksScan)
+                                                         boolean 
isCompressionEnabled)
       throws IOException, URISyntaxException, InterruptedException {
     HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
@@ -1873,15 +1854,14 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     writer.appendBlock(commandBlock);
     writer.close();
 
-    checkLogBlocksAndKeys("100", schema, diskMapType, isCompressionEnabled, 
enableOptimizedLogBlocksScan,
+    checkLogBlocksAndKeys("100", schema, diskMapType, isCompressionEnabled,
         100, 100, Option.empty());
   }
 
   @ParameterizedTest
   @MethodSource("testArguments")
   public void 
testAvroLogRecordReaderWithInsertsDeleteAndRollback(ExternalSpillableMap.DiskMapType
 diskMapType,
-                                                                  boolean 
isCompressionEnabled,
-                                                                  boolean 
enableOptimizedLogBlocksScan)
+                                                                  boolean 
isCompressionEnabled)
       throws IOException, URISyntaxException, InterruptedException {
 
     // Write a 3 Data blocs with same InstantTime (written in same batch)
@@ -1930,15 +1910,14 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     writer.appendBlock(commandBlock);
     writer.close();
 
-    checkLogBlocksAndKeys("101", schema, diskMapType, isCompressionEnabled, 
enableOptimizedLogBlocksScan,
+    checkLogBlocksAndKeys("101", schema, diskMapType, isCompressionEnabled,
         0, 0, Option.empty());
   }
 
   @ParameterizedTest
   @MethodSource("testArguments")
   void 
testLogReaderWithDifferentVersionsOfDeleteBlocks(ExternalSpillableMap.DiskMapType
 diskMapType,
-                                                        boolean 
isCompressionEnabled,
-                                                        boolean 
enableOptimizedLogBlocksScan)
+                                                        boolean 
isCompressionEnabled)
       throws IOException, URISyntaxException, InterruptedException {
     HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
@@ -2028,7 +2007,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .withSpillableMapBasePath(spillableBasePath)
         .withDiskMapType(diskMapType)
         .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
-        .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
         .build()) {
       assertEquals(200, scanner.getTotalLogRecords(), "We still would read 200 
records");
       final List<String> readKeys = new ArrayList<>(200);
@@ -2091,7 +2069,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
 
     // Should be able to read all 110 records
     checkLogBlocksAndKeys("101", schema, 
ExternalSpillableMap.DiskMapType.BITCASK, false,
-        false, 110, 110, Option.empty());
+         110, 110, Option.empty());
 
     // Write a rollback for commit 100 which is not the latest commit
     header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
@@ -2102,7 +2080,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
 
     // Should only be able to read 10 records from commit 101
     checkLogBlocksAndKeys("101", schema, 
ExternalSpillableMap.DiskMapType.BITCASK, false,
-        false, 10, 10, Option.empty());
+        10, 10, Option.empty());
 
     // Write a rollback for commit 101 which is the latest commit
     header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "101");
@@ -2113,15 +2091,14 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     writer.close();
 
     // Should not read any records as both commits are rolled back
-    checkLogBlocksAndKeys("101", schema, 
ExternalSpillableMap.DiskMapType.BITCASK, false,
+    checkLogBlocksAndKeys("101", schema, 
ExternalSpillableMap.DiskMapType.BITCASK,
         false, 0, 0, Option.empty());
   }
 
   @ParameterizedTest
   @MethodSource("testArguments")
   public void 
testAvroLogRecordReaderWithMixedInsertsCorruptsAndRollback(ExternalSpillableMap.DiskMapType
 diskMapType,
-                                                                         
boolean isCompressionEnabled,
-                                                                         
boolean enableOptimizedLogBlocksScan)
+                                                                         
boolean isCompressionEnabled)
       throws IOException, URISyntaxException, InterruptedException {
 
     // Write a 3 Data blocs with same InstantTime (written in same batch)
@@ -2205,12 +2182,12 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     writer.close();
 
     checkLogBlocksAndKeys("101", schema, 
ExternalSpillableMap.DiskMapType.BITCASK, false,
-        false, 0, 0, Option.empty());
+         0, 0, Option.empty());
     FileCreateUtilsLegacy.deleteDeltaCommit(basePath, "100", storage);
   }
 
   @ParameterizedTest
-  @MethodSource("testArgumentsWithoutOptimizedScanArg")
+  @MethodSource("testArguments")
   public void 
testAvroLogRecordReaderWithMixedInsertsCorruptsRollbackAndMergedLogBlock(ExternalSpillableMap.DiskMapType
 diskMapType,
                                                                                
        boolean isCompressionEnabled)
       throws IOException, URISyntaxException, InterruptedException {
@@ -2384,7 +2361,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .withSpillableMapBasePath(spillableBasePath)
         .withDiskMapType(diskMapType)
         .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
-        .withOptimizedLogBlocksScan(true)
         .build();
     assertEquals(600, scanner.getTotalLogRecords(), "We would read 600 records 
from scanner");
     final List<String> readKeys = new ArrayList<>();
@@ -2417,8 +2393,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
    */
   private void testAvroLogRecordReaderMergingMultipleLogFiles(int 
numRecordsInLog1, int numRecordsInLog2,
                                                               
ExternalSpillableMap.DiskMapType diskMapType,
-                                                              boolean 
isCompressionEnabled,
-                                                              boolean 
enableOptimizedLogBlocksScan) {
+                                                              boolean 
isCompressionEnabled) {
     try {
       // Write one Data block with same InstantTime (written in same batch)
       HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
@@ -2471,7 +2446,6 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
           .withSpillableMapBasePath(spillableBasePath)
           .withDiskMapType(diskMapType)
           .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
-          .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
           .build();
 
       assertEquals(Math.max(numRecordsInLog1, numRecordsInLog2), 
scanner.getNumMergedRecordsInLog(),
@@ -2485,40 +2459,37 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @ParameterizedTest
   @MethodSource("testArguments")
   public void 
testAvroLogRecordReaderWithFailedTaskInFirstStageAttempt(ExternalSpillableMap.DiskMapType
 diskMapType,
-                                                                       boolean 
isCompressionEnabled,
-                                                                       boolean 
enableOptimizedLogBlocksScan) {
+                                                                       boolean 
isCompressionEnabled) {
     /*
      * FIRST_ATTEMPT_FAILED:
      * Original task from the stage attempt failed, but subsequent stage retry 
succeeded.
      */
     testAvroLogRecordReaderMergingMultipleLogFiles(77, 100,
-        diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan);
+        diskMapType, isCompressionEnabled);
   }
 
   @ParameterizedTest
   @MethodSource("testArguments")
   public void 
testAvroLogRecordReaderWithFailedTaskInSecondStageAttempt(ExternalSpillableMap.DiskMapType
 diskMapType,
-                                                                        
boolean isCompressionEnabled,
-                                                                        
boolean enableOptimizedLogBlocksScan) {
+                                                                        
boolean isCompressionEnabled) {
     /*
      * SECOND_ATTEMPT_FAILED:
      * Original task from stage attempt succeeded, but subsequent retry 
attempt failed.
      */
     testAvroLogRecordReaderMergingMultipleLogFiles(100, 66,
-        diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan);
+        diskMapType, isCompressionEnabled);
   }
 
   @ParameterizedTest
   @MethodSource("testArguments")
   public void 
testAvroLogRecordReaderTasksSucceededInBothStageAttempts(ExternalSpillableMap.DiskMapType
 diskMapType,
-                                                                       boolean 
isCompressionEnabled,
-                                                                       boolean 
enableOptimizedLogBlocksScan) {
+                                                                       boolean 
isCompressionEnabled) {
     /*
      * BOTH_ATTEMPTS_SUCCEEDED:
      * Original task from the stage attempt and duplicate task from the stage 
retry succeeded.
      */
     testAvroLogRecordReaderMergingMultipleLogFiles(100, 100,
-        diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan);
+        diskMapType, isCompressionEnabled);
   }
 
   @Test
@@ -2860,25 +2831,11 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   }
 
   private static Stream<Arguments> testArguments() {
-    // Arg1: ExternalSpillableMap Type, Arg2: isDiskMapCompressionEnabled, 
Arg3: enableOptimizedLogBlocksScan
-    return Stream.of(
-        arguments(ExternalSpillableMap.DiskMapType.BITCASK, false, false),
-        arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false, false),
-        arguments(ExternalSpillableMap.DiskMapType.BITCASK, true, false),
-        arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true, false),
-        arguments(ExternalSpillableMap.DiskMapType.BITCASK, false, true),
-        arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false, true),
-        arguments(ExternalSpillableMap.DiskMapType.BITCASK, true, true),
-        arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true, true)
-    );
-  }
-
-  private static Stream<Arguments> testArgumentsWithoutOptimizedScanArg() {
     // Arg1: ExternalSpillableMap Type, Arg2: isDiskMapCompressionEnabled
     return Stream.of(
         arguments(ExternalSpillableMap.DiskMapType.BITCASK, false),
-        arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false),
         arguments(ExternalSpillableMap.DiskMapType.BITCASK, true),
+        arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false),
         arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true)
     );
   }
@@ -2941,7 +2898,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   }
 
   private void checkLogBlocksAndKeys(String latestInstantTime, HoodieSchema 
schema, ExternalSpillableMap.DiskMapType diskMapType,
-                                     boolean isCompressionEnabled, boolean 
enableOptimizedLogBlocksScan, int expectedTotalRecords,
+                                     boolean isCompressionEnabled, int 
expectedTotalRecords,
                                      int expectedTotalKeys, 
Option<Set<String>> expectedKeys) throws IOException {
     List<String> allLogFiles =
         FSUtils.getAllLogFiles(storage, partitionPath, "test-fileid1", 
HoodieLogFile.DELTA_EXTENSION, "100")
@@ -2958,8 +2915,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .withBufferSize(BUFFER_SIZE)
         .withSpillableMapBasePath(spillableBasePath)
         .withDiskMapType(diskMapType)
-        .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
-        .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan);
+        .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled);
     try (HoodieMergedLogRecordScanner scanner = builder.build()) {
       assertEquals(expectedTotalRecords, scanner.getTotalLogRecords(), "There 
should be " + expectedTotalRecords + " records");
       final Set<String> readKeys = new HashSet<>();
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieTimestampAwareParquetInputFormat.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieTimestampAwareParquetInputFormat.java
index 4c08b346eed6..83d6b15c136b 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieTimestampAwareParquetInputFormat.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieTimestampAwareParquetInputFormat.java
@@ -18,14 +18,15 @@
 
 package org.apache.hudi.hadoop.avro;
 
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.internal.schema.InternalSchema;
+
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.parquet.hadoop.ParquetInputFormat;
 import org.apache.parquet.hadoop.util.ContextUtil;
 
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
index 4d64f85e6bf0..8d0d1765eb5e 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -22,7 +22,6 @@ import org.apache.hudi.avro.AvroRecordContext;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.HoodieMemoryConfig;
-import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.engine.RecordContext;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieAvroRecordMerger;
@@ -114,8 +113,6 @@ public class RealtimeCompactedRecordReader extends 
AbstractRealtimeRecordReader
         
.withDiskMapType(jobConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(),
 HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue()))
         
.withBitCaskDiskMapCompressionEnabled(jobConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
             
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
-        
.withOptimizedLogBlocksScan(jobConf.getBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.key(),
-            
Boolean.parseBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue())))
         
.withInternalSchema(schemaEvolutionContext.internalSchemaOption.orElse(InternalSchema.getEmptyInternalSchema()))
         .build();
   }
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java
index e2cab948111c..e5f2a7d573ae 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java
@@ -18,18 +18,18 @@
 
 package org.apache.hudi.hadoop;
 
-import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.schema.HoodieSchemaField;
 import org.apache.hudi.common.schema.HoodieSchemaType;
 import org.apache.hudi.hadoop.utils.HiveAvroSerializer;
 
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.MockitoAnnotations;
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
index 93d8161b42a9..daac532af401 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
@@ -23,7 +23,6 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.HoodieMemoryConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -297,7 +296,6 @@ public class DFSHoodieDatasetInputReader extends 
DFSDeltaInputReader {
           
.withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath())
           
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
           
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
-          
.withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
           
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
           .build();
       // readAvro log files
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
index eb4f5828fa6a..7a97367bb3c5 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
@@ -124,9 +124,6 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
     readerProps.setProperty(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key,
       
configuration.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
         
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()).toString)
-    
readerProps.setProperty(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.key,
-      
configuration.get(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.key(),
-        HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
     readerProps
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
index ffc38ee64dfe..6eaade123a64 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
@@ -113,8 +113,7 @@ class PartitionBucketIndexManager extends BaseProcedure
 
       config = config ++ Map(OPERATION.key -> BULK_INSERT_OPERATION_OPT_VAL,
         HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> 
WriteOperationType.BUCKET_RESCALE.value(),
-        ENABLE_ROW_WRITER.key() -> "true",
-        HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.key -> 
writeClient.getConfig.enableOptimizedLogBlocksScan.toString)
+        ENABLE_ROW_WRITER.key() -> "true")
 
       // Determine which operation to perform
       if (showConfig) {
@@ -225,9 +224,6 @@ class PartitionBucketIndexManager extends BaseProcedure
         spark.sparkContext.parallelize(allFileSlice, 
allFileSlice.size).flatMap(fileSlice => {
           // instantiate other supporting cast
           val internalSchemaOption: Option[InternalSchema] = Option.empty()
-          // get this value from config, which has obtained this from write 
client
-          val enableOptimizedLogBlockScan = 
config.getOrElse(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.key(),
-            
HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()).toBoolean
           // instantiate FG reader
           val fileGroupReader = HoodieFileGroupReader.newBuilder()
             .withReaderContext(readerContextFactory.getContext)
@@ -239,7 +235,6 @@ class PartitionBucketIndexManager extends BaseProcedure
             .withInternalSchema(internalSchemaOption) // not support evolution 
of schema for now
             .withProps(metaClient.getTableConfig.getProps)
             .withShouldUseRecordPosition(false)
-            .withEnableOptimizedLogBlockScan(enableOptimizedLogBlockScan)
             .build()
           val iterator = fileGroupReader.getClosableIterator
           CloseableIteratorListener.addListener(iterator)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
index 4ee8c34c935d..f24ac473e1d0 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
@@ -242,7 +242,6 @@ public class TestHoodieClientOnMergeOnReadStorage extends 
HoodieClientTestBase {
   @Test
   public void testLogCompactionOnMORTableWithoutBaseFile() throws Exception {
     HoodieCompactionConfig compactionConfig = 
HoodieCompactionConfig.newBuilder()
-        .withEnableOptimizedLogBlocksScan("true")
         .withMaxNumDeltaCommitsBeforeCompaction(1)
         .withLogCompactionBlocksThreshold(1)
         .build();
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
index 90720d57d8a1..b35da0450cfd 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
@@ -298,7 +298,7 @@ public class TestMetadataUtilRLIandSIRecordGeneration 
extends HoodieClientTestBa
       HoodieCommitMetadata compactionCommitMetadata = (HoodieCommitMetadata) 
compactionWriteMetadata.getCommitMetadata().get();
       // no RLI records should be generated for compaction operation.
       assertTrue(convertMetadataToRecordIndexRecords(context, 
compactionCommitMetadata, writeConfig.getMetadataConfig(),
-          metaClient, writeConfig.getWritesFileIdEncoding(), 
compactionInstantOpt.get(), EngineType.SPARK, 
writeConfig.enableOptimizedLogBlocksScan()).isEmpty());
+          metaClient, writeConfig.getWritesFileIdEncoding(), 
compactionInstantOpt.get(), EngineType.SPARK).isEmpty());
     }
   }
 
@@ -533,7 +533,7 @@ public class TestMetadataUtilRLIandSIRecordGeneration 
extends HoodieClientTestBa
             // used for RLI
             HoodieReaderContext<?> readerContext = 
context.getReaderContextFactory(metaClient).getContext();
             
finalActualDeletes.addAll(getRevivedAndDeletedKeysFromMergedLogs(metaClient, 
latestCommitTimestamp, Collections.singletonList(fullFilePath.toString()), 
writerSchemaOpt,
-                Collections.singletonList(fullFilePath.toString()), 
writeStat.getPartitionPath(), readerContext, 
writeConfig.enableOptimizedLogBlocksScan()).getValue());
+                Collections.singletonList(fullFilePath.toString()), 
writeStat.getPartitionPath(), readerContext).getValue());
 
             // used in SI flow
             
actualUpdatesAndDeletes.addAll(getRecordKeys(writeStat.getPartitionPath(), 
writeStat.getPrevCommit(), writeStat.getFileId(),
@@ -724,7 +724,6 @@ public class TestMetadataUtilRLIandSIRecordGeneration 
extends HoodieClientTestBa
           .withHoodieTableMetaClient(datasetMetaClient)
           .withProps(properties)
           .withEmitDelete(true)
-          
.withEnableOptimizedLogBlockScan(writeConfig.enableOptimizedLogBlocksScan())
           .build();
       Set<String> allRecordKeys = new HashSet<>();
       try (ClosableIterator<String> keysIterator = 
reader.getClosableKeyIterator()) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
index 56c1afcd3bdc..5e88aeb8d5e1 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
@@ -1529,7 +1529,6 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
         .withRequestedSchema(schema)
         .withDataSchema(schema)
         .withProps(new TypedProperties())
-        
.withEnableOptimizedLogBlockScan(writeConfig.getMetadataConfig().isOptimizedLogBlocksScanEnabled())
         .build();
 
     try (ClosableIterator<HoodieRecord<IndexedRecord>> iter = 
fileGroupReader.getClosableHoodieRecordIterator()) {

Reply via email to