This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e0ca6dd0d52 [HUDI-7652] Add new `HoodieMergeKey` API to support simple 
and composite keys (#11077)
e0ca6dd0d52 is described below

commit e0ca6dd0d52c4171d5b4ee83cbc7ef684cc471dc
Author: Sagar Sumit <[email protected]>
AuthorDate: Fri May 17 14:57:29 2024 +0530

    [HUDI-7652] Add new `HoodieMergeKey` API to support simple and composite 
keys (#11077)
    
    Introduce a new abstract class `BaseHoodieMergedLogRecordScanner` which
    subclasses from `AbstractHoodieLogRecordReader`. The new abstract class
    holds the `records` map as `ExternalSpillableMap<K, HoodieRecord>` and 
exposes
    `public abstract Map<K, HoodieRecord> getRecords()` API. The existing
    `HoodieMergedLogRecordScanner` now derives from the new abstract class
    (instead of `AbstractHoodieLogRecordReader`) and uses String keys.
---
 .../hudi/client/TestJavaHoodieBackedMetadata.java  |   2 +-
 .../functional/TestHoodieBackedMetadata.java       |   2 +-
 .../functional/TestHoodieBackedTableMetadata.java  |   2 +-
 .../common/model/HoodieMetadataRecordMerger.java   |  43 ++++
 .../hudi/common/model/HoodieRecordMerger.java      |   8 +
 .../table/log/AbstractHoodieLogRecordReader.java   |  25 ++
 .../log/BaseHoodieMergedLogRecordScanner.java      | 260 ++++++++++++++++++++
 .../table/log/HoodieMergedLogRecordScanner.java    | 265 +++-----------------
 .../log/HoodieMetadataMergedLogRecordScanner.java  | 270 +++++++++++++++++++++
 .../hudi/metadata/HoodieBackedTableMetadata.java   |   2 +-
 .../metadata/HoodieMetadataLogRecordReader.java    |  41 +++-
 .../hudi/metadata/HoodieTableMetadataUtil.java     |   3 +-
 .../model/TestHoodieMetadataRecordMerger.java      |  65 +++++
 .../apache/hudi/functional/TestMORDataSource.scala |   2 +-
 14 files changed, 740 insertions(+), 250 deletions(-)

diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index ae17a34da3e..c2b85bd70b0 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -946,7 +946,7 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     if (enableMetaFields) {
       schema = HoodieAvroUtils.addMetadataFields(schema);
     }
-    HoodieMetadataLogRecordReader logRecordReader = 
HoodieMetadataLogRecordReader.newBuilder()
+    HoodieMetadataLogRecordReader logRecordReader = 
HoodieMetadataLogRecordReader.newBuilder(FILES.getPartitionPath())
         .withStorage(metadataMetaClient.getStorage())
         .withBasePath(metadataMetaClient.getBasePath())
         .withLogFilePaths(logFilePaths)
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 700b9f1cd24..4da78d84980 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -1413,7 +1413,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     if (enableMetaFields) {
       schema = HoodieAvroUtils.addMetadataFields(schema);
     }
-    HoodieMetadataLogRecordReader logRecordReader = 
HoodieMetadataLogRecordReader.newBuilder()
+    HoodieMetadataLogRecordReader logRecordReader = 
HoodieMetadataLogRecordReader.newBuilder(FILES.getPartitionPath())
         .withStorage(metadataMetaClient.getStorage())
         .withBasePath(metadataMetaClient.getBasePath())
         .withLogFilePaths(logFilePaths)
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
index d3dcf94f641..cec201ee754 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
@@ -491,7 +491,7 @@ public class TestHoodieBackedTableMetadata extends 
TestHoodieMetadataBase {
    */
   private void verifyMetadataMergedRecords(HoodieTableMetaClient 
metadataMetaClient, List<String> logFilePaths, String latestCommitTimestamp) {
     Schema schema = 
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
-    HoodieMetadataLogRecordReader logRecordReader = 
HoodieMetadataLogRecordReader.newBuilder()
+    HoodieMetadataLogRecordReader logRecordReader = 
HoodieMetadataLogRecordReader.newBuilder(FILES.getPartitionPath())
         .withStorage(metadataMetaClient.getStorage())
         .withBasePath(metadataMetaClient.getBasePath())
         .withLogFilePaths(logFilePaths)
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieMetadataRecordMerger.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieMetadataRecordMerger.java
new file mode 100644
index 00000000000..c118cd803bc
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieMetadataRecordMerger.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Record merger that accumulates metadata records.
+ */
+public class HoodieMetadataRecordMerger extends 
HoodiePreCombineAvroRecordMerger {
+
+  public static final HoodieMetadataRecordMerger INSTANCE = new 
HoodieMetadataRecordMerger();
+
+  @Override
+  public List<Pair<HoodieRecord, Schema>> fullOuterMerge(HoodieRecord older, 
Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) 
throws IOException {
+    // TODO: Implement this method for secondary keys. Currently, it just 
mimics the superclass.
+    return Collections.singletonList(super.merge(older, oldSchema, newer, 
newSchema, props).get());
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
index cdd41242ef1..62cde38a351 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
@@ -33,6 +33,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 
 /**
  * HoodieMerge defines how to merge two records. It is a stateless component.
@@ -126,6 +127,13 @@ public interface HoodieRecordMerger extends Serializable {
     return true;
   }
 
+  /**
+   * Merges two records with the same key in full outer merge fashion i.e. all 
fields from both records are included.
+   */
+  default List<Pair<HoodieRecord, Schema>> fullOuterMerge(HoodieRecord older, 
Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) 
throws IOException {
+    throw new UnsupportedOperationException("Partial merging logic is not 
implemented.");
+  }
+
   default String[] getMandatoryFieldsForMerging(HoodieTableConfig cfg) {
     ArrayList<String> requiredFields = new ArrayList<>();
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index c545640e3eb..77f91a5b540 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -36,6 +36,7 @@ import org.apache.hudi.common.util.InternalSchemaCache;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
@@ -864,6 +865,30 @@ public abstract class AbstractHoodieLogRecordReader {
       throw new UnsupportedOperationException();
     }
 
+    public Builder withKeyFieldOverride(String keyFieldOverride) {
+      throw new UnsupportedOperationException();
+    }
+
+    public Builder withForceFullScan(boolean forceFullScan) {
+      throw new UnsupportedOperationException();
+    }
+
+    public Builder withMaxMemorySizeInBytes(Long maxMemorySizeInBytes) {
+      throw new UnsupportedOperationException();
+    }
+
+    public Builder withSpillableMapBasePath(String spillableMapBasePath) {
+      throw new UnsupportedOperationException();
+    }
+
+    public Builder withDiskMapType(ExternalSpillableMap.DiskMapType 
diskMapType) {
+      throw new UnsupportedOperationException();
+    }
+
+    public Builder withBitCaskDiskMapCompressionEnabled(boolean 
bitCaskDiskMapCompressionEnabled) {
+      throw new UnsupportedOperationException();
+    }
+
     public Builder withTableMetaClient(HoodieTableMetaClient 
hoodieTableMetaClient) {
       throw new UnsupportedOperationException();
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieMergedLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieMergedLogRecordScanner.java
new file mode 100644
index 00000000000..17a6284d20e
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieMergedLogRecordScanner.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.log;
+
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieEmptyRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.SpillableMapUtils;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.storage.HoodieStorage;
+
+import org.apache.avro.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public abstract class BaseHoodieMergedLogRecordScanner<K extends Serializable> 
extends AbstractHoodieLogRecordReader
+    implements Iterable<HoodieRecord>, Closeable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseHoodieMergedLogRecordScanner.class);
+  // A timer for calculating elapsed time in millis
+  public final HoodieTimer timer = HoodieTimer.create();
+  // Map of compacted/merged records
+  protected final ExternalSpillableMap<K, HoodieRecord> records;
+  // Set of already scanned prefixes allowing us to avoid scanning same 
prefixes again
+  private final Set<String> scannedPrefixes;
+  // count of merged records in log
+  private long numMergedRecordsInLog;
+  private final long maxMemorySizeInBytes;
+  // Stores the total time taken to perform reading and merging of log blocks
+  private long totalTimeTakenToReadAndMergeBlocks;
+
+  @SuppressWarnings("unchecked")
+  protected BaseHoodieMergedLogRecordScanner(HoodieStorage storage, String 
basePath, List<String> logFilePaths, Schema readerSchema,
+                                             String latestInstantTime, Long 
maxMemorySizeInBytes,
+                                             boolean reverseReader, int 
bufferSize, String spillableMapBasePath,
+                                             Option<InstantRange> instantRange,
+                                             ExternalSpillableMap.DiskMapType 
diskMapType,
+                                             boolean 
isBitCaskDiskMapCompressionEnabled,
+                                             boolean withOperationField, 
boolean forceFullScan,
+                                             Option<String> partitionName,
+                                             InternalSchema internalSchema,
+                                             Option<String> keyFieldOverride,
+                                             boolean 
enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger,
+                                             Option<HoodieTableMetaClient> 
hoodieTableMetaClientOption) {
+    super(storage, basePath, logFilePaths, readerSchema, latestInstantTime, 
reverseReader, bufferSize,
+        instantRange, withOperationField, forceFullScan, partitionName, 
internalSchema, keyFieldOverride, enableOptimizedLogBlocksScan, recordMerger,
+        hoodieTableMetaClientOption);
+    try {
+      this.maxMemorySizeInBytes = maxMemorySizeInBytes;
+      // Store merged records for all versions for this log file, set the 
in-memory footprint to maxInMemoryMapSize
+      this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, 
spillableMapBasePath, new DefaultSizeEstimator(),
+          new HoodieRecordSizeEstimator(readerSchema), diskMapType, 
isBitCaskDiskMapCompressionEnabled);
+      this.scannedPrefixes = new HashSet<>();
+    } catch (IOException e) {
+      throw new HoodieIOException("IOException when creating 
ExternalSpillableMap at " + spillableMapBasePath, e);
+    }
+
+    if (forceFullScan) {
+      performScan();
+    }
+  }
+
+  /**
+   * Scans delta-log files processing blocks
+   */
+  public final void scan() {
+    scan(false);
+  }
+
+  public final void scan(boolean skipProcessingBlocks) {
+    if (forceFullScan) {
+      // NOTE: When full-scan is enforced, scanning is invoked upfront (during 
initialization)
+      return;
+    }
+
+    scanInternal(Option.empty(), skipProcessingBlocks);
+  }
+
+  /**
+   * Provides incremental scanning capability where only provided keys will be 
looked
+   * up in the delta-log files, scanned and subsequently materialized into the 
internal
+   * cache
+   *
+   * @param keys to be looked up
+   */
+  public void scanByFullKeys(List<String> keys) {
+    // We can skip scanning in case reader is in full-scan mode, in which case 
all blocks
+    // are processed upfront (no additional scanning is necessary)
+    if (forceFullScan) {
+      return; // no-op
+    }
+
+    List<String> missingKeys = keys.stream()
+        .filter(key -> !records.containsKey(key))
+        .collect(Collectors.toList());
+
+    if (missingKeys.isEmpty()) {
+      // All the required records are already fetched, no-op
+      return;
+    }
+
+    scanInternal(Option.of(KeySpec.fullKeySpec(missingKeys)), false);
+  }
+
+  /**
+   * Provides incremental scanning capability where only keys matching 
provided key-prefixes
+   * will be looked up in the delta-log files, scanned and subsequently 
materialized into
+   * the internal cache
+   *
+   * @param keyPrefixes to be looked up
+   */
+  public void scanByKeyPrefixes(List<String> keyPrefixes) {
+    // We can skip scanning in case reader is in full-scan mode, in which case 
all blocks
+    // are processed upfront (no additional scanning is necessary)
+    if (forceFullScan) {
+      return;
+    }
+
+    List<String> missingKeyPrefixes = keyPrefixes.stream()
+        .filter(keyPrefix ->
+            // NOTE: We can skip scanning the prefixes that have already
+            //       been covered by the previous scans
+            scannedPrefixes.stream().noneMatch(keyPrefix::startsWith))
+        .collect(Collectors.toList());
+
+    if (missingKeyPrefixes.isEmpty()) {
+      // All the required records are already fetched, no-op
+      return;
+    }
+
+    // NOTE: When looking up by key-prefixes unfortunately we can't 
short-circuit
+    //       and will have to scan every time as we can't know (based on just
+    //       the records cached) whether particular prefix was scanned or just 
records
+    //       matching the prefix looked up (by [[scanByFullKeys]] API)
+    scanInternal(Option.of(KeySpec.prefixKeySpec(missingKeyPrefixes)), false);
+    scannedPrefixes.addAll(missingKeyPrefixes);
+  }
+
+  private void performScan() {
+    // Do the scan and merge
+    timer.startTimer();
+
+    scanInternal(Option.empty(), false);
+
+    this.totalTimeTakenToReadAndMergeBlocks = timer.endTimer();
+    this.numMergedRecordsInLog = records.size();
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Number of log files scanned => {}", logFilePaths.size());
+      LOG.info("MaxMemoryInBytes allowed for compaction => {}", 
maxMemorySizeInBytes);
+      LOG.info("Number of entries in MemoryBasedMap in ExternalSpillableMap => 
{}", records.getInMemoryMapNumEntries());
+      LOG.info("Total size in bytes of MemoryBasedMap in ExternalSpillableMap 
=> {}", records.getCurrentInMemoryMapSize());
+      LOG.info("Number of entries in DiskBasedMap in ExternalSpillableMap => 
{}", records.getDiskBasedMapNumEntries());
+      LOG.info("Size of file spilled to disk => {}", 
records.getSizeOfFileOnDiskInBytes());
+    }
+  }
+
+  @Override
+  public Iterator<HoodieRecord> iterator() {
+    return records.iterator();
+  }
+
+  @Override
+  protected void processNextDeletedRecord(DeleteRecord deleteRecord) {
+    String key = deleteRecord.getRecordKey();
+    HoodieRecord oldRecord = records.get(key);
+    if (oldRecord != null) {
+      // Merge and store the merged record. The ordering val is taken to 
decide whether the same key record
+      // should be deleted or be kept. The old record is kept only if the 
DELETE record has smaller ordering val.
+      // For same ordering values, uses the natural order(arrival time 
semantics).
+
+      Comparable curOrderingVal = 
oldRecord.getOrderingValue(this.readerSchema, 
this.hoodieTableMetaClient.getTableConfig().getProps());
+      Comparable deleteOrderingVal = deleteRecord.getOrderingValue();
+      // Checks the ordering value does not equal to 0
+      // because we use 0 as the default value which means natural order
+      boolean choosePrev = !deleteOrderingVal.equals(0)
+          && ReflectionUtils.isSameClass(curOrderingVal, deleteOrderingVal)
+          && curOrderingVal.compareTo(deleteOrderingVal) > 0;
+      if (choosePrev) {
+        // The DELETE message is obsolete if the old message has greater 
orderingVal.
+        return;
+      }
+    }
+    // Put the DELETE record
+    if (recordType == HoodieRecord.HoodieRecordType.AVRO) {
+      records.put((K) key, SpillableMapUtils.generateEmptyPayload(key,
+          deleteRecord.getPartitionPath(), deleteRecord.getOrderingValue(), 
getPayloadClassFQN()));
+    } else {
+      HoodieEmptyRecord record = new HoodieEmptyRecord<>(new HoodieKey(key, 
deleteRecord.getPartitionPath()), null, deleteRecord.getOrderingValue(), 
recordType);
+      records.put((K) key, record);
+    }
+  }
+
+  public abstract Map<K, HoodieRecord> getRecords();
+
+  public HoodieRecord.HoodieRecordType getRecordType() {
+    return recordMerger.getRecordType();
+  }
+
+  public long getNumMergedRecordsInLog() {
+    return numMergedRecordsInLog;
+  }
+
+  protected static <T> HoodieRecord getLatestHoodieRecord(HoodieRecord<T> 
newRecord, HoodieRecord<T> combinedRecord, String key) {
+    HoodieRecord latestHoodieRecord =
+        combinedRecord.newInstance(new HoodieKey(key, 
newRecord.getPartitionPath()), newRecord.getOperation());
+
+    latestHoodieRecord.unseal();
+    latestHoodieRecord.setCurrentLocation(newRecord.getCurrentLocation());
+    latestHoodieRecord.seal();
+    return latestHoodieRecord;
+  }
+
+  public long getTotalTimeTakenToReadAndMergeBlocks() {
+    return totalTimeTakenToReadAndMergeBlocks;
+  }
+
+  @Override
+  public void close() {
+    if (records != null) {
+      records.close();
+    }
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index d29ee7bd46b..8679e2455ef 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -18,48 +18,33 @@
 
 package org.apache.hudi.common.table.log;
 
-import org.apache.hudi.common.config.HoodieCommonConfig;
-import org.apache.hudi.common.model.DeleteRecord;
-import org.apache.hudi.common.model.HoodieEmptyRecord;
-import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
 import org.apache.hudi.common.util.CollectionUtils;
-import org.apache.hudi.common.util.DefaultSizeEstimator;
-import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
 import org.apache.hudi.common.util.HoodieRecordUtils;
-import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
-import org.apache.hudi.common.util.SpillableMapUtils;
-import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 
 import org.apache.avro.Schema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.NotThreadSafe;
 
-import java.io.Closeable;
 import java.io.IOException;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.stream.Collectors;
 
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
 import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
+import static 
org.apache.hudi.common.table.cdc.HoodieCDCUtils.CDC_LOGFILE_SUFFIX;
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
 
 /**
  * Scans through all the blocks in a list of HoodieLogFile and builds up a 
compacted/merged list of records which will
@@ -74,172 +59,31 @@ import static 
org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
  * This results in two I/O passes over the log file.
  */
 @NotThreadSafe
-public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
-    implements Iterable<HoodieRecord>, Closeable {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieMergedLogRecordScanner.class);
-  // A timer for calculating elapsed time in millis
-  public final HoodieTimer timer = HoodieTimer.create();
-  // Map of compacted/merged records
-  private final ExternalSpillableMap<String, HoodieRecord> records;
-  // Set of already scanned prefixes allowing us to avoid scanning same 
prefixes again
-  private final Set<String> scannedPrefixes;
-  // count of merged records in log
-  private long numMergedRecordsInLog;
-  private final long maxMemorySizeInBytes;
-  // Stores the total time taken to perform reading and merging of log blocks
-  private long totalTimeTakenToReadAndMergeBlocks;
+public class HoodieMergedLogRecordScanner extends 
BaseHoodieMergedLogRecordScanner<String> {
 
   @SuppressWarnings("unchecked")
-  private HoodieMergedLogRecordScanner(HoodieStorage storage, String basePath, 
List<String> logFilePaths, Schema readerSchema,
-                                       String latestInstantTime, Long 
maxMemorySizeInBytes,
-                                       boolean reverseReader, int bufferSize, 
String spillableMapBasePath,
-                                       Option<InstantRange> instantRange,
-                                       ExternalSpillableMap.DiskMapType 
diskMapType,
-                                       boolean 
isBitCaskDiskMapCompressionEnabled,
-                                       boolean withOperationField, boolean 
forceFullScan,
-                                       Option<String> partitionName,
-                                       InternalSchema internalSchema,
-                                       Option<String> keyFieldOverride,
-                                       boolean enableOptimizedLogBlocksScan, 
HoodieRecordMerger recordMerger,
-                                      Option<HoodieTableMetaClient> 
hoodieTableMetaClientOption) {
-    super(storage, basePath, logFilePaths, readerSchema, latestInstantTime, 
reverseReader, bufferSize,
-        instantRange, withOperationField, forceFullScan, partitionName, 
internalSchema, keyFieldOverride, enableOptimizedLogBlocksScan, recordMerger,
-        hoodieTableMetaClientOption);
-    try {
-      this.maxMemorySizeInBytes = maxMemorySizeInBytes;
-      // Store merged records for all versions for this log file, set the 
in-memory footprint to maxInMemoryMapSize
-      this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, 
spillableMapBasePath, new DefaultSizeEstimator(),
-          new HoodieRecordSizeEstimator(readerSchema), diskMapType, 
isBitCaskDiskMapCompressionEnabled);
-      this.scannedPrefixes = new HashSet<>();
-    } catch (IOException e) {
-      throw new HoodieIOException("IOException when creating 
ExternalSpillableMap at " + spillableMapBasePath, e);
-    }
-
-    if (forceFullScan) {
-      performScan();
-    }
-  }
-
-  /**
-   * Scans delta-log files processing blocks
-   */
-  public final void scan() {
-    scan(false);
-  }
-
-  public final void scan(boolean skipProcessingBlocks) {
-    if (forceFullScan) {
-      // NOTE: When full-scan is enforced, scanning is invoked upfront (during 
initialization)
-      return;
-    }
-
-    scanInternal(Option.empty(), skipProcessingBlocks);
-  }
-
-  /**
-   * Provides incremental scanning capability where only provided keys will be 
looked
-   * up in the delta-log files, scanned and subsequently materialized into the 
internal
-   * cache
-   *
-   * @param keys to be looked up
-   */
-  public void scanByFullKeys(List<String> keys) {
-    // We can skip scanning in case reader is in full-scan mode, in which case 
all blocks
-    // are processed upfront (no additional scanning is necessary)
-    if (forceFullScan) {
-      return; // no-op
-    }
-
-    List<String> missingKeys = keys.stream()
-        .filter(key -> !records.containsKey(key))
-        .collect(Collectors.toList());
-
-    if (missingKeys.isEmpty()) {
-      // All the required records are already fetched, no-op
-      return;
-    }
-
-    scanInternal(Option.of(KeySpec.fullKeySpec(missingKeys)), false);
-  }
-
-  /**
-   * Provides incremental scanning capability where only keys matching 
provided key-prefixes
-   * will be looked up in the delta-log files, scanned and subsequently 
materialized into
-   * the internal cache
-   *
-   * @param keyPrefixes to be looked up
-   */
-  public void scanByKeyPrefixes(List<String> keyPrefixes) {
-    // We can skip scanning in case reader is in full-scan mode, in which case 
all blocks
-    // are processed upfront (no additional scanning is necessary)
-    if (forceFullScan) {
-      return;
-    }
-
-    List<String> missingKeyPrefixes = keyPrefixes.stream()
-        .filter(keyPrefix ->
-            // NOTE: We can skip scanning the prefixes that have already
-            //       been covered by the previous scans
-            scannedPrefixes.stream().noneMatch(keyPrefix::startsWith))
-        .collect(Collectors.toList());
-
-    if (missingKeyPrefixes.isEmpty()) {
-      // All the required records are already fetched, no-op
-      return;
-    }
-
-    // NOTE: When looking up by key-prefixes unfortunately we can't 
short-circuit
-    //       and will have to scan every time as we can't know (based on just
-    //       the records cached) whether particular prefix was scanned or just 
records
-    //       matching the prefix looked up (by [[scanByFullKeys]] API)
-    scanInternal(Option.of(KeySpec.prefixKeySpec(missingKeyPrefixes)), false);
-    scannedPrefixes.addAll(missingKeyPrefixes);
-  }
-
-  private void performScan() {
-    // Do the scan and merge
-    timer.startTimer();
-
-    scanInternal(Option.empty(), false);
-
-    this.totalTimeTakenToReadAndMergeBlocks = timer.endTimer();
-    this.numMergedRecordsInLog = records.size();
-
-    if (LOG.isInfoEnabled()) {
-      LOG.info("Number of log files scanned => {}", logFilePaths.size());
-      LOG.info("MaxMemoryInBytes allowed for compaction => {}", 
maxMemorySizeInBytes);
-      LOG.info("Number of entries in MemoryBasedMap in ExternalSpillableMap => 
{}", records.getInMemoryMapNumEntries());
-      LOG.info("Total size in bytes of MemoryBasedMap in ExternalSpillableMap 
=> {}", records.getCurrentInMemoryMapSize());
-      LOG.info("Number of entries in DiskBasedMap in ExternalSpillableMap => 
{}", records.getDiskBasedMapNumEntries());
-      LOG.info("Size of file spilled to disk => {}", 
records.getSizeOfFileOnDiskInBytes());
-    }
+  protected HoodieMergedLogRecordScanner(HoodieStorage storage, String 
basePath, List<String> logFilePaths, Schema readerSchema,
+                                         String latestInstantTime, Long 
maxMemorySizeInBytes,
+                                         boolean reverseReader, int 
bufferSize, String spillableMapBasePath,
+                                         Option<InstantRange> instantRange,
+                                         ExternalSpillableMap.DiskMapType 
diskMapType,
+                                         boolean 
isBitCaskDiskMapCompressionEnabled,
+                                         boolean withOperationField, boolean 
forceFullScan,
+                                         Option<String> partitionName,
+                                         InternalSchema internalSchema,
+                                         Option<String> keyFieldOverride,
+                                         boolean enableOptimizedLogBlocksScan, 
HoodieRecordMerger recordMerger,
+                                         Option<HoodieTableMetaClient> 
hoodieTableMetaClientOption) {
+    super(storage, basePath, logFilePaths, readerSchema, latestInstantTime, 
maxMemorySizeInBytes, reverseReader, bufferSize,
+        spillableMapBasePath, instantRange, diskMapType, 
isBitCaskDiskMapCompressionEnabled, withOperationField, forceFullScan,
+        partitionName, internalSchema, keyFieldOverride, 
enableOptimizedLogBlocksScan, recordMerger, hoodieTableMetaClientOption);
   }
 
   @Override
-  public Iterator<HoodieRecord> iterator() {
-    return records.iterator();
-  }
-
   public Map<String, HoodieRecord> getRecords() {
     return records;
   }
 
-  public HoodieRecordType getRecordType() {
-    return recordMerger.getRecordType();
-  }
-
-  public long getNumMergedRecordsInLog() {
-    return numMergedRecordsInLog;
-  }
-
-  /**
-   * Returns the builder for {@code HoodieMergedLogRecordScanner}.
-   */
-  public static HoodieMergedLogRecordScanner.Builder newBuilder() {
-    return new Builder();
-  }
-
   @Override
   public <T> void processNextRecord(HoodieRecord<T> newRecord) throws 
IOException {
     String key = newRecord.getRecordKey();
@@ -250,12 +94,7 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordReader
           newRecord, readerSchema, this.getPayloadProps()).get().getLeft();
       // If pre-combine returns existing record, no need to update it
       if (combinedRecord.getData() != prevRecord.getData()) {
-        HoodieRecord latestHoodieRecord =
-            combinedRecord.newInstance(new HoodieKey(key, 
newRecord.getPartitionPath()), newRecord.getOperation());
-
-        latestHoodieRecord.unseal();
-        latestHoodieRecord.setCurrentLocation(newRecord.getCurrentLocation());
-        latestHoodieRecord.seal();
+        HoodieRecord latestHoodieRecord = getLatestHoodieRecord(newRecord, 
combinedRecord, key);
 
         // NOTE: Record have to be cloned here to make sure if it holds 
low-level engine-specific
         //       payload pointing into a shared, mutable (underlying) buffer 
we get a clean copy of
@@ -271,50 +110,15 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordReader
     }
   }
 
-  @Override
-  protected void processNextDeletedRecord(DeleteRecord deleteRecord) {
-    String key = deleteRecord.getRecordKey();
-    HoodieRecord oldRecord = records.get(key);
-    if (oldRecord != null) {
-      // Merge and store the merged record. The ordering val is taken to 
decide whether the same key record
-      // should be deleted or be kept. The old record is kept only if the 
DELETE record has smaller ordering val.
-      // For same ordering values, uses the natural order(arrival time 
semantics).
-
-      Comparable curOrderingVal = 
oldRecord.getOrderingValue(this.readerSchema, 
this.hoodieTableMetaClient.getTableConfig().getProps());
-      Comparable deleteOrderingVal = deleteRecord.getOrderingValue();
-      // Checks the ordering value does not equal to 0
-      // because we use 0 as the default value which means natural order
-      boolean choosePrev = !deleteOrderingVal.equals(0)
-          && ReflectionUtils.isSameClass(curOrderingVal, deleteOrderingVal)
-          && curOrderingVal.compareTo(deleteOrderingVal) > 0;
-      if (choosePrev) {
-        // The DELETE message is obsolete if the old message has greater 
orderingVal.
-        return;
-      }
-    }
-    // Put the DELETE record
-    if (recordType == HoodieRecordType.AVRO) {
-      records.put(key, SpillableMapUtils.generateEmptyPayload(key,
-          deleteRecord.getPartitionPath(), deleteRecord.getOrderingValue(), 
getPayloadClassFQN()));
-    } else {
-      HoodieEmptyRecord record = new HoodieEmptyRecord<>(new HoodieKey(key, 
deleteRecord.getPartitionPath()), null, deleteRecord.getOrderingValue(), 
recordType);
-      records.put(key, record);
-    }
-  }
-
-  public long getTotalTimeTakenToReadAndMergeBlocks() {
-    return totalTimeTakenToReadAndMergeBlocks;
-  }
-
-  @Override
-  public void close() {
-    if (records != null) {
-      records.close();
-    }
+  /**
+   * Returns the builder for {@code HoodieMergedLogRecordScanner}.
+   */
+  public static HoodieMergedLogRecordScanner.Builder newBuilder() {
+    return new HoodieMergedLogRecordScanner.Builder();
   }
 
   /**
-   * Builder used to build {@code HoodieUnMergedLogRecordScanner}.
+   * Builder used to build {@code HoodieMergedLogRecordScanner}.
    */
   public static class Builder extends AbstractHoodieLogRecordReader.Builder {
     private HoodieStorage storage;
@@ -328,8 +132,8 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordReader
     // specific configurations
     private Long maxMemorySizeInBytes;
     private String spillableMapBasePath;
-    private ExternalSpillableMap.DiskMapType diskMapType = 
HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue();
-    private boolean isBitCaskDiskMapCompressionEnabled = 
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue();
+    private ExternalSpillableMap.DiskMapType diskMapType = 
SPILLABLE_DISK_MAP_TYPE.defaultValue();
+    private boolean isBitCaskDiskMapCompressionEnabled = 
DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue();
     // incremental filtering
     private Option<InstantRange> instantRange = Option.empty();
     private String partitionName;
@@ -357,7 +161,7 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordReader
     @Override
     public Builder withLogFilePaths(List<String> logFilePaths) {
       this.logFilePaths = logFilePaths.stream()
-          .filter(p -> !p.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
+          .filter(p -> !p.endsWith(CDC_LOGFILE_SUFFIX))
           .collect(Collectors.toList());
       return this;
     }
@@ -441,8 +245,8 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordReader
       return this;
     }
 
-    public Builder withKeyFiledOverride(String keyFieldOverride) {
-      this.keyFieldOverride = Objects.requireNonNull(keyFieldOverride);
+    public Builder withKeyFieldOverride(String keyFieldOverride) {
+      this.keyFieldOverride = requireNonNull(keyFieldOverride);
       return this;
     }
 
@@ -463,7 +267,7 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordReader
         this.partitionName = getRelativePartitionPath(
             new StoragePath(basePath), new 
StoragePath(this.logFilePaths.get(0)).getParent());
       }
-      ValidationUtils.checkArgument(recordMerger != null);
+      checkArgument(recordMerger != null);
 
       return new HoodieMergedLogRecordScanner(storage, basePath, logFilePaths, 
readerSchema,
           latestInstantTime, maxMemorySizeInBytes, reverseReader,
@@ -474,4 +278,3 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordReader
     }
   }
 }
-
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMetadataMergedLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMetadataMergedLogRecordScanner.java
new file mode 100644
index 00000000000..c4a52cd6d86
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMetadataMergedLogRecordScanner.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.log;
+
+import org.apache.hudi.common.model.HoodieMetadataRecordMerger;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.avro.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
+import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
+import static 
org.apache.hudi.common.table.cdc.HoodieCDCUtils.CDC_LOGFILE_SUFFIX;
+import static org.apache.hudi.common.util.StringUtils.nonEmpty;
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+
+/**
+ * Merged log record scanner for metadata table using {@link 
HoodieMetadataRecordMerger}.
+ */
+public class HoodieMetadataMergedLogRecordScanner extends 
BaseHoodieMergedLogRecordScanner<String> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieMetadataMergedLogRecordScanner.class);
+
+  private HoodieMetadataMergedLogRecordScanner(HoodieStorage storage, String 
basePath, List<String> logFilePaths, Schema readerSchema,
+                                               String latestInstantTime, Long 
maxMemorySizeInBytes,
+                                               boolean reverseReader, int 
bufferSize, String spillableMapBasePath,
+                                               Option<InstantRange> 
instantRange,
+                                               
ExternalSpillableMap.DiskMapType diskMapType,
+                                               boolean 
isBitCaskDiskMapCompressionEnabled,
+                                               boolean withOperationField, 
boolean forceFullScan,
+                                               Option<String> partitionName,
+                                               InternalSchema internalSchema,
+                                               Option<String> keyFieldOverride,
+                                               boolean 
enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger,
+                                               Option<HoodieTableMetaClient> 
hoodieTableMetaClientOption) {
+    super(storage, basePath, logFilePaths, readerSchema, latestInstantTime, 
maxMemorySizeInBytes, reverseReader, bufferSize, spillableMapBasePath, 
instantRange, diskMapType,
+        isBitCaskDiskMapCompressionEnabled, withOperationField, forceFullScan, 
partitionName, internalSchema, keyFieldOverride, enableOptimizedLogBlocksScan, 
recordMerger,
+        hoodieTableMetaClientOption);
+  }
+
+  @Override
+  public Map<String, HoodieRecord> getRecords() {
+    return records;
+  }
+
+  @Override
+  public <T> void processNextRecord(HoodieRecord<T> newRecord) throws 
IOException {
+    // Merge the new record with the existing record in the map
+    HoodieRecord<T> oldRecord = (HoodieRecord<T>) 
records.get(newRecord.getRecordKey());
+    if (oldRecord != null) {
+      LOG.debug("Merging new record with existing record in the map. Key: {}", 
newRecord.getRecordKey());
+      recordMerger.fullOuterMerge(oldRecord, readerSchema, newRecord, 
readerSchema, this.getPayloadProps()).forEach(
+          mergedRecord -> {
+            HoodieRecord<T> combinedRecord = mergedRecord.getLeft();
+            if (combinedRecord.getData() != oldRecord.getData()) {
+              HoodieRecord latestHoodieRecord = 
getLatestHoodieRecord(newRecord, combinedRecord, newRecord.getRecordKey());
+              records.put(newRecord.getRecordKey(), latestHoodieRecord.copy());
+            }
+          });
+    } else {
+      // Put the record as is
+      // NOTE: Record have to be cloned here to make sure if it holds 
low-level engine-specific
+      //       payload pointing into a shared, mutable (underlying) buffer we 
get a clean copy of
+      //       it since these records will be put into records(Map).
+      records.put(newRecord.getRecordKey(), newRecord.copy());
+    }
+  }
+
+  /**
+   * Returns the builder for {@code HoodieMetadataMergedLogRecordScanner}.
+   */
+  public static HoodieMetadataMergedLogRecordScanner.Builder newBuilder() {
+    return new HoodieMetadataMergedLogRecordScanner.Builder();
+  }
+
+  /**
+   * Builder used to build {@code HoodieMetadataMergedLogRecordScanner}.
+   */
+  public static class Builder extends AbstractHoodieLogRecordReader.Builder {
+    private HoodieStorage storage;
+    private String basePath;
+    private List<String> logFilePaths;
+    private Schema readerSchema;
+    private InternalSchema internalSchema = 
InternalSchema.getEmptyInternalSchema();
+    private String latestInstantTime;
+    private boolean reverseReader;
+    private int bufferSize;
+    // specific configurations
+    private Long maxMemorySizeInBytes;
+    private String spillableMapBasePath;
+    private ExternalSpillableMap.DiskMapType diskMapType = 
SPILLABLE_DISK_MAP_TYPE.defaultValue();
+    private boolean isBitCaskDiskMapCompressionEnabled = 
DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue();
+    // incremental filtering
+    private Option<InstantRange> instantRange = Option.empty();
+    private String partitionName;
+    // operation field default false
+    private boolean withOperationField = false;
+    private String keyFieldOverride;
+    // By default, we're doing a full-scan
+    private boolean forceFullScan = true;
+    private boolean enableOptimizedLogBlocksScan = false;
+    private HoodieRecordMerger recordMerger = 
HoodieMetadataRecordMerger.INSTANCE;
+    protected HoodieTableMetaClient hoodieTableMetaClient;
+
+    @Override
+    public Builder withStorage(HoodieStorage storage) {
+      this.storage = storage;
+      return this;
+    }
+
+    @Override
+    public Builder withBasePath(String basePath) {
+      this.basePath = basePath;
+      return this;
+    }
+
+    @Override
+    public Builder withLogFilePaths(List<String> logFilePaths) {
+      this.logFilePaths = logFilePaths.stream()
+          .filter(p -> !p.endsWith(CDC_LOGFILE_SUFFIX))
+          .collect(Collectors.toList());
+      return this;
+    }
+
+    @Override
+    public Builder withReaderSchema(Schema schema) {
+      this.readerSchema = schema;
+      return this;
+    }
+
+    @Override
+    public Builder withLatestInstantTime(String latestInstantTime) {
+      this.latestInstantTime = latestInstantTime;
+      return this;
+    }
+
+    @Override
+    public Builder withReverseReader(boolean reverseReader) {
+      this.reverseReader = reverseReader;
+      return this;
+    }
+
+    @Override
+    public Builder withBufferSize(int bufferSize) {
+      this.bufferSize = bufferSize;
+      return this;
+    }
+
+    @Override
+    public Builder withInstantRange(Option<InstantRange> instantRange) {
+      this.instantRange = instantRange;
+      return this;
+    }
+
+    public Builder withMaxMemorySizeInBytes(Long maxMemorySizeInBytes) {
+      this.maxMemorySizeInBytes = maxMemorySizeInBytes;
+      return this;
+    }
+
+    public Builder withSpillableMapBasePath(String spillableMapBasePath) {
+      this.spillableMapBasePath = spillableMapBasePath;
+      return this;
+    }
+
+    public Builder withDiskMapType(ExternalSpillableMap.DiskMapType 
diskMapType) {
+      this.diskMapType = diskMapType;
+      return this;
+    }
+
+    public Builder withBitCaskDiskMapCompressionEnabled(boolean 
isBitCaskDiskMapCompressionEnabled) {
+      this.isBitCaskDiskMapCompressionEnabled = 
isBitCaskDiskMapCompressionEnabled;
+      return this;
+    }
+
+    @Override
+    public Builder withInternalSchema(InternalSchema internalSchema) {
+      this.internalSchema = internalSchema;
+      return this;
+    }
+
+    public Builder withOperationField(boolean withOperationField) {
+      this.withOperationField = withOperationField;
+      return this;
+    }
+
+    @Override
+    public Builder withPartition(String partitionName) {
+      this.partitionName = partitionName;
+      return this;
+    }
+
+    @Override
+    public Builder withOptimizedLogBlocksScan(boolean 
enableOptimizedLogBlocksScan) {
+      this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan;
+      return this;
+    }
+
+    @Override
+    public Builder withRecordMerger(HoodieRecordMerger recordMerger) {
+      this.recordMerger = recordMerger;
+      return this;
+    }
+
+    public Builder withKeyFieldOverride(String keyFieldOverride) {
+      this.keyFieldOverride = requireNonNull(keyFieldOverride);
+      return this;
+    }
+
+    public Builder withForceFullScan(boolean forceFullScan) {
+      this.forceFullScan = forceFullScan;
+      return this;
+    }
+
+    @Override
+    public Builder withTableMetaClient(HoodieTableMetaClient 
hoodieTableMetaClient) {
+      this.hoodieTableMetaClient = hoodieTableMetaClient;
+      return this;
+    }
+
+    @Override
+    public HoodieMetadataMergedLogRecordScanner build() {
+      if (this.partitionName == null && 
CollectionUtils.nonEmpty(this.logFilePaths)) {
+        this.partitionName = getRelativePartitionPath(
+            new StoragePath(basePath), new 
StoragePath(this.logFilePaths.get(0)).getParent());
+      }
+      checkArgument(recordMerger != null);
+      checkArgument(nonEmpty(partitionName), "Partition name is required for 
HoodieMetadataMergedLogRecordScanner.");
+
+      return new HoodieMetadataMergedLogRecordScanner(storage, basePath, 
logFilePaths, readerSchema,
+          latestInstantTime, maxMemorySizeInBytes, reverseReader,
+          bufferSize, spillableMapBasePath, instantRange,
+          diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField, 
forceFullScan,
+          Option.of(partitionName), internalSchema, 
Option.ofNullable(keyFieldOverride), enableOptimizedLogBlocksScan, recordMerger,
+          Option.ofNullable(hoodieTableMetaClient));
+    }
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 59605679c5d..78a3ea48e38 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -641,7 +641,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     // Load the schema
     Schema schema = 
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
     HoodieCommonConfig commonConfig = 
HoodieCommonConfig.newBuilder().fromProperties(metadataConfig.getProps()).build();
-    HoodieMetadataLogRecordReader logRecordScanner = 
HoodieMetadataLogRecordReader.newBuilder()
+    HoodieMetadataLogRecordReader logRecordScanner = 
HoodieMetadataLogRecordReader.newBuilder(partitionName)
         .withStorage(metadataMetaClient.getStorage())
         .withBasePath(metadataBasePath)
         .withLogFilePaths(sortedLogFilePaths)
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
index 6705a75625e..17b7688133e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
@@ -20,7 +20,10 @@ package org.apache.hudi.metadata;
 
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader;
+import org.apache.hudi.common.table.log.BaseHoodieMergedLogRecordScanner;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.log.HoodieMetadataMergedLogRecordScanner;
 import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
@@ -40,6 +43,8 @@ import java.util.Set;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX;
+
 /**
  * Metadata log-block records reading implementation, internally relying on
  * {@link HoodieMergedLogRecordScanner} to merge corresponding Metadata 
Table's delta log-blocks
@@ -48,17 +53,17 @@ import java.util.stream.Collectors;
 @ThreadSafe
 public class HoodieMetadataLogRecordReader implements Closeable {
 
-  private final HoodieMergedLogRecordScanner logRecordScanner;
+  private final BaseHoodieMergedLogRecordScanner<String> logRecordScanner;
 
-  private HoodieMetadataLogRecordReader(HoodieMergedLogRecordScanner 
logRecordScanner) {
+  private HoodieMetadataLogRecordReader(BaseHoodieMergedLogRecordScanner 
logRecordScanner) {
     this.logRecordScanner = logRecordScanner;
   }
 
   /**
    * Returns the builder for {@code HoodieMetadataMergedLogRecordScanner}.
    */
-  public static HoodieMetadataLogRecordReader.Builder newBuilder() {
-    return new HoodieMetadataLogRecordReader.Builder();
+  public static HoodieMetadataLogRecordReader.Builder newBuilder(String 
partitionName) {
+    return new HoodieMetadataLogRecordReader.Builder(partitionName);
   }
 
   @SuppressWarnings("unchecked")
@@ -150,14 +155,20 @@ public class HoodieMetadataLogRecordReader implements 
Closeable {
    * Builder used to build {@code HoodieMetadataMergedLogRecordScanner}.
    */
   public static class Builder {
-    private final HoodieMergedLogRecordScanner.Builder scannerBuilder =
-        new HoodieMergedLogRecordScanner.Builder()
-            .withKeyFiledOverride(HoodieMetadataPayload.KEY_FIELD_NAME)
-            // NOTE: Merging of Metadata Table's records is currently handled 
using {@code HoodiePreCombineAvroRecordMerger}
-            //       for compatibility purposes; In the future it {@code 
HoodieMetadataPayload} semantic
-            //       will be migrated to its own custom instance of {@code 
RecordMerger}
-            .withReverseReader(false)
-            .withOperationField(false);
+    private final AbstractHoodieLogRecordReader.Builder scannerBuilder;
+    private final String partitionName;
+
+    public Builder(String partitionName) {
+      this.partitionName = partitionName;
+      this.scannerBuilder = shouldUseMetadataMergedLogRecordScanner() ? 
HoodieMetadataMergedLogRecordScanner.newBuilder() : 
HoodieMergedLogRecordScanner.newBuilder();
+      scannerBuilder
+          .withKeyFieldOverride(HoodieMetadataPayload.KEY_FIELD_NAME)
+          // NOTE: Merging of Metadata Table's records is currently handled 
using {@code HoodiePreCombineAvroRecordMerger}
+          //       for compatibility purposes; In the future it {@code 
HoodieMetadataPayload} semantic
+          //       will be migrated to its own custom instance of {@code 
RecordMerger}
+          .withReverseReader(false)
+          .withOperationField(false);
+    }
 
     public Builder withStorage(HoodieStorage storage) {
       scannerBuilder.withStorage(storage);
@@ -238,7 +249,11 @@ public class HoodieMetadataLogRecordReader implements 
Closeable {
     }
 
     public HoodieMetadataLogRecordReader build() {
-      return new HoodieMetadataLogRecordReader(scannerBuilder.build());
+      return new 
HoodieMetadataLogRecordReader((BaseHoodieMergedLogRecordScanner) 
scannerBuilder.build());
+    }
+
+    private boolean shouldUseMetadataMergedLogRecordScanner() {
+      return PARTITION_NAME_SECONDARY_INDEX.equals(partitionName);
     }
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 6441f85c532..a274d07b027 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -156,6 +156,7 @@ public class HoodieTableMetadataUtil {
   public static final String PARTITION_NAME_BLOOM_FILTERS = "bloom_filters";
   public static final String PARTITION_NAME_RECORD_INDEX = "record_index";
   public static final String PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX = 
"func_index_";
+  public static final String PARTITION_NAME_SECONDARY_INDEX = 
"secondary_index";
 
   private HoodieTableMetadataUtil() {
   }
@@ -1278,7 +1279,7 @@ public class HoodieTableMetadataUtil {
    * it could subsequently be used in column stats
    *
    * NOTE: This method has to stay compatible with the semantic of
-   *      {@link ParquetUtils#readColumnStatsFromMetadata} as they are used in 
tandem
+   *      {@link FileFormatUtils#readColumnStatsFromMetadata} as they are used 
in tandem
    */
   private static Comparable<?> coerceToComparable(Schema schema, Object val) {
     if (val == null) {
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieMetadataRecordMerger.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieMetadataRecordMerger.java
new file mode 100644
index 00000000000..b63eb9e1531
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieMetadataRecordMerger.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests for {@link HoodieMetadataRecordMerger}.
+ */
+public class TestHoodieMetadataRecordMerger {
+
+  private HoodieTestDataGenerator dataGen;
+
+  @BeforeEach
+  public void setUp() {
+    dataGen = new HoodieTestDataGenerator();
+  }
+
+  @AfterEach
+  public void cleanUp() {
+    if (dataGen != null) {
+      dataGen = null;
+    }
+  }
+
+  @Test
+  public void testFullOuterMerge() throws IOException {
+    List<HoodieRecord> newRecordList = dataGen.generateInserts("000", 1);
+    List<HoodieRecord> updateRecordList = dataGen.generateUpdates("0001", 
newRecordList);
+    HoodieMetadataRecordMerger recordMerger = new HoodieMetadataRecordMerger();
+    List<Pair<HoodieRecord, Schema>> mergedRecords = 
recordMerger.fullOuterMerge(newRecordList.get(0), AVRO_SCHEMA, 
updateRecordList.get(0), AVRO_SCHEMA, new TypedProperties());
+    assertEquals(1, mergedRecords.size());
+    assertEquals(updateRecordList.get(0), mergedRecords.get(0).getLeft());
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 0d4bd3501e3..516ce425d1e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -614,7 +614,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
 
   @ParameterizedTest
   @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", 
"SPARK"))
-  def testPreCombineFiledForReadMOR(recordType: HoodieRecordType): Unit = {
+  def testPreCombineFieldForReadMOR(recordType: HoodieRecordType): Unit = {
     val (writeOpts, readOpts) = getWriterReaderOpts(recordType)
 
     writeData((1, "a0", 10, 100, false), writeOpts)

Reply via email to