This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d2ea5236458 [HUDI-9311] Revert HUDI-7146 which causes perf overhead 
for merging MDT log files (#13136)
d2ea5236458 is described below

commit d2ea523645856dd0cc1873caa7aabd89f6052e14
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Mon Apr 14 04:21:03 2025 -0700

    [HUDI-9311] Revert HUDI-7146 which causes perf overhead for merging MDT log 
files (#13136)
---
 .../hudi/index/SparkMetadataTableRecordIndex.java  |   9 +-
 .../hudi/client/functional/TestHoodieIndex.java    |  48 +++----
 .../table/log/block/HoodieHFileDataBlock.java      |  11 --
 .../hudi/common/table/view/NoOpTableMetadata.java  |   9 +-
 .../apache/hudi/metadata/BaseTableMetadata.java    |  20 ++-
 .../metadata/FileSystemBackedTableMetadata.java    |   9 +-
 .../hudi/metadata/HoodieBackedTableMetadata.java   | 149 ---------------------
 .../metadata/HoodieMetadataLogRecordReader.java    |  23 +---
 .../apache/hudi/metadata/HoodieTableMetadata.java  |  16 +--
 .../org/apache/hudi/common/util/HFileUtils.java    |  45 ++++---
 .../hudi/io/hadoop/HoodieAvroHFileWriter.java      |   5 +-
 .../hudi/io/hadoop/HoodieHBaseAvroHFileReader.java |  83 ++----------
 .../hadoop/TestHoodieHBaseHFileReaderWriter.java   |  62 ---------
 .../io/hadoop/TestHoodieHFileReaderWriterBase.java |   2 +-
 .../org/apache/hudi/RecordLevelIndexSupport.scala  |   6 +-
 .../org/apache/hudi/SecondaryIndexSupport.scala    |   6 +-
 .../functional/TestHoodieBackedMetadata.java       |  13 +-
 .../hudi/functional/RecordLevelIndexTestBase.scala |   5 +-
 .../hudi/functional/TestMetadataRecordIndex.scala  |   5 +-
 19 files changed, 97 insertions(+), 429 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java
index 3c5d47eefe6..7811d02f2b5 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java
@@ -80,6 +80,10 @@ public class SparkMetadataTableRecordIndex extends 
HoodieIndex<Object, Object> {
       HoodieWriteConfig otherConfig = 
HoodieWriteConfig.newBuilder().withProperties(config.getProps())
           
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(FALLBACK_INDEX_TYPE).build()).build();
       HoodieIndex fallbackIndex = 
SparkHoodieIndexFactory.createIndex(otherConfig);
+
+      // Fallback index needs to be a global index like record index
+      ValidationUtils.checkArgument(fallbackIndex.isGlobal(), "Fallback index 
needs to be a global index like record index");
+
       return fallbackIndex.tagLocation(records, context, hoodieTable);
     }
 
@@ -163,10 +167,9 @@ public class SparkMetadataTableRecordIndex extends 
HoodieIndex<Object, Object> {
       recordKeyIterator.forEachRemaining(keysToLookup::add);
 
       // recordIndexInfo object only contains records that are present in 
record_index.
-      Map<String, List<HoodieRecordGlobalLocation>> recordIndexInfo = 
hoodieTable.getMetadataTable().readRecordIndex(keysToLookup);
+      Map<String, HoodieRecordGlobalLocation> recordIndexInfo = 
hoodieTable.getMetadataTable().readRecordIndex(keysToLookup);
       return recordIndexInfo.entrySet().stream()
-          .flatMap(e -> e.getValue().stream().map(loc -> new 
Tuple2<>(e.getKey(), loc)))
-          .iterator();
+          .map(e -> new Tuple2<>(e.getKey(), e.getValue())).iterator();
     }
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
index d4cd1000f5d..72beb2f3302 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
@@ -194,11 +194,11 @@ public class TestHoodieIndex extends 
TestHoodieMetadataBase {
         new RawTripTestPayload(recordStr4).toHoodieRecord());
   }
 
-  private static List<HoodieRecord> getInsertsWithSameKeyInTwoPartitions() 
throws IOException {
-    String recordStr1 = 
"{\"_row_key\":\"001\",\"time\":\"2016-01-31T00:00:01.000Z\",\"number\":1}";
-    String recordStr2 = 
"{\"_row_key\":\"002\",\"time\":\"2016-01-31T00:00:02.000Z\",\"number\":2}";
-    String recordStr3 = 
"{\"_row_key\":\"003\",\"time\":\"2016-01-31T00:00:03.000Z\",\"number\":3}";
-    String recordStr4 = 
"{\"_row_key\":\"003\",\"time\":\"2015-01-31T00:00:04.000Z\",\"number\":4}";
+  private static List<HoodieRecord> getInsertsBatch2() throws IOException {
+    String recordStr1 = 
"{\"_row_key\":\"005\",\"time\":\"2016-01-31T00:00:01.000Z\",\"number\":5}";
+    String recordStr2 = 
"{\"_row_key\":\"006\",\"time\":\"2016-01-31T00:00:02.000Z\",\"number\":6}";
+    String recordStr3 = 
"{\"_row_key\":\"007\",\"time\":\"2016-01-31T00:00:03.000Z\",\"number\":7}";
+    String recordStr4 = 
"{\"_row_key\":\"008\",\"time\":\"2017-01-31T00:00:04.000Z\",\"number\":8}";
     return Arrays.asList(
         new RawTripTestPayload(recordStr1).toHoodieRecord(),
         new RawTripTestPayload(recordStr2).toHoodieRecord(),
@@ -206,34 +206,16 @@ public class TestHoodieIndex extends 
TestHoodieMetadataBase {
         new RawTripTestPayload(recordStr4).toHoodieRecord());
   }
 
-  @Test
-  public void testRecordIndexForNonGlobalWrites() throws Exception {
-    setUp(IndexType.RECORD_INDEX, true, true);
-    final int totalRecords = 4;
-    List<HoodieRecord> records = getInsertsWithSameKeyInTwoPartitions();
-    JavaRDD<HoodieRecord> writtenRecords = jsc.parallelize(records, 1);
-
-    // Insert totalRecords records
-    String newCommitTime = writeClient.createNewInstantTime();
-    writeClient.startCommitWithTime(newCommitTime);
-    JavaRDD<WriteStatus> writeStatusRdd = writeClient.upsert(writtenRecords, 
newCommitTime);
-    List<WriteStatus> writeStatuses = writeStatusRdd.collect();
-    assertNoWriteErrors(writeStatuses);
-    String[] fileIdsFromWriteStatuses = 
writeStatuses.stream().map(WriteStatus::getFileId)
-        .sorted().toArray(String[]::new);
-
-    // Now commit this & update location of records inserted and validate no 
errors
-    writeClient.commit(newCommitTime, writeStatusRdd);
-    // Now tagLocation for these records, index should tag them correctly
-    metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTable hoodieTable = HoodieSparkTable.create(config, context, 
metaClient);
-    JavaRDD<HoodieRecord> javaRDD = tagLocation(index, writtenRecords, 
hoodieTable);
-    Map<String, String> recordKeyToPartitionPathMap = new HashMap();
-    List<HoodieRecord> hoodieRecords = writtenRecords.collect();
-    hoodieRecords.forEach(entry -> 
recordKeyToPartitionPathMap.put(entry.getRecordKey(), 
entry.getPartitionPath()));
-    String[] taggedFileIds = javaRDD.map(record -> 
record.getCurrentLocation().getFileId()).distinct().collect()
-        .stream().sorted().toArray(String[]::new);
-    assertArrayEquals(taggedFileIds, fileIdsFromWriteStatuses);
+  private static List<HoodieRecord> getUpdates() throws IOException {
+    String recordStr1 = 
"{\"_row_key\":\"001\",\"time\":\"2016-01-31T00:00:01.000Z\",\"number\":5}";
+    String recordStr2 = 
"{\"_row_key\":\"002\",\"time\":\"2016-01-31T00:00:02.000Z\",\"number\":6}";
+    String recordStr3 = 
"{\"_row_key\":\"003\",\"time\":\"2016-01-31T00:00:03.000Z\",\"number\":7}";
+    String recordStr4 = 
"{\"_row_key\":\"004\",\"time\":\"2017-01-31T00:00:04.000Z\",\"number\":8}";
+    return new ArrayList<>(Arrays.asList(
+        new RawTripTestPayload(recordStr1).toHoodieRecord(),
+        new RawTripTestPayload(recordStr2).toHoodieRecord(),
+        new RawTripTestPayload(recordStr3).toHoodieRecord(),
+        new RawTripTestPayload(recordStr4).toHoodieRecord()));
   }
 
   @ParameterizedTest
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index ea474142463..a71bf203246 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.common.table.log.block;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.engine.HoodieReaderContext;
@@ -38,7 +37,6 @@ import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.inline.InLineFSUtils;
 
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -185,15 +183,6 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
     }
   }
 
-  /**
-   * Print the record in json format
-   */
-  private void printRecord(String msg, byte[] bs, Schema schema) throws 
IOException {
-    GenericRecord record = HoodieAvroUtils.bytesToAvro(bs, schema);
-    byte[] json = HoodieAvroUtils.avroToJson(record, true);
-    LOG.error(String.format("%s: %s", msg, new String(json)));
-  }
-
   private HoodieConfig getHFileReaderConfig(boolean useNativeHFileReader) {
     HoodieConfig config = new HoodieConfig();
     config.setValue(
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/NoOpTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/NoOpTableMetadata.java
index 1065adf8227..ae1e36ac39e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/NoOpTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/NoOpTableMetadata.java
@@ -97,12 +97,12 @@ class NoOpTableMetadata implements HoodieTableMetadata {
   }
 
   @Override
-  public Map<String, List<HoodieRecordGlobalLocation>> 
readRecordIndex(List<String> recordKeys) {
+  public Map<String, HoodieRecordGlobalLocation> readRecordIndex(List<String> 
recordKeys) {
     throw new HoodieMetadataException("Unsupported operation: 
readRecordIndex!");
   }
 
   @Override
-  public Map<String, List<HoodieRecordGlobalLocation>> 
readSecondaryIndex(List<String> secondaryKeys, String partitionName) {
+  public Map<String, HoodieRecordGlobalLocation> 
readSecondaryIndex(List<String> secondaryKeys, String partitionName) {
     return Collections.emptyMap();
   }
 
@@ -111,11 +111,6 @@ class NoOpTableMetadata implements HoodieTableMetadata {
     throw new HoodieMetadataException("Unsupported operation: 
getRecordsByKeyPrefixes!");
   }
 
-  @Override
-  public Map<String, List<HoodieRecord<HoodieMetadataPayload>>> 
getAllRecordsByKeys(List<String> keys, String partitionName) {
-    return Collections.emptyMap();
-  }
-
   @Override
   public Option<String> getSyncedInstantTime() {
     throw new HoodieMetadataException("Unsupported operation: 
getSyncedInstantTime!");
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index 0ffbce2ff4f..671ff75ab5c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -28,8 +28,8 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
@@ -287,7 +287,7 @@ public abstract class BaseTableMetadata extends 
AbstractHoodieTableMetadata {
    * @param recordKeys The list of record keys to read
    */
   @Override
-  public Map<String, List<HoodieRecordGlobalLocation>> 
readRecordIndex(List<String> recordKeys) {
+  public Map<String, HoodieRecordGlobalLocation> readRecordIndex(List<String> 
recordKeys) {
     // If record index is not initialized yet, we cannot return an empty 
result here unlike the code for reading from other
     // indexes. This is because results from this function are used for 
upserts and returning an empty result here would lead
     // to existing records being inserted again causing duplicates.
@@ -296,15 +296,13 @@ public abstract class BaseTableMetadata extends 
AbstractHoodieTableMetadata {
         "Record index is not initialized in MDT");
 
     HoodieTimer timer = HoodieTimer.start();
-    Map<String, List<HoodieRecord<HoodieMetadataPayload>>> result = 
getAllRecordsByKeys(recordKeys, 
MetadataPartitionType.RECORD_INDEX.getPartitionPath());
-    Map<String, List<HoodieRecordGlobalLocation>> recordKeyToLocation = new 
HashMap<>(result.size());
-    result.forEach((key, records) -> records.forEach(record -> {
+    Map<String, HoodieRecord<HoodieMetadataPayload>> result = 
getRecordsByKeys(recordKeys, 
MetadataPartitionType.RECORD_INDEX.getPartitionPath());
+    Map<String, HoodieRecordGlobalLocation> recordKeyToLocation = new 
HashMap<>(result.size());
+    result.forEach((key, record) -> {
       if (!record.getData().isDeleted()) {
-        List<HoodieRecordGlobalLocation> locations = 
recordKeyToLocation.getOrDefault(key, new ArrayList<>());
-        locations.add(record.getData().getRecordGlobalLocation());
-        recordKeyToLocation.put(key, locations);
+        recordKeyToLocation.put(key, 
record.getData().getRecordGlobalLocation());
       }
-    }));
+    });
 
     metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_RECORD_INDEX_TIME_STR, 
timer.endTimer()));
     metrics.ifPresent(m -> 
m.setMetric(HoodieMetadataMetrics.LOOKUP_RECORD_INDEX_KEYS_COUNT_STR, 
recordKeys.size()));
@@ -321,7 +319,7 @@ public abstract class BaseTableMetadata extends 
AbstractHoodieTableMetadata {
    * @param secondaryKeys The list of secondary keys to read
    */
   @Override
-  public Map<String, List<HoodieRecordGlobalLocation>> 
readSecondaryIndex(List<String> secondaryKeys, String partitionName) {
+  public Map<String, HoodieRecordGlobalLocation> 
readSecondaryIndex(List<String> secondaryKeys, String partitionName) {
     
ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX),
         "Record index is not initialized in MDT");
     ValidationUtils.checkState(
@@ -430,7 +428,7 @@ public abstract class BaseTableMetadata extends 
AbstractHoodieTableMetadata {
   }
 
   /**
-   * Handle spurious deletes. Depending on config, throw an exception or log 
warn msg.
+   * Handle spurious deletes. Depending on config, throw an exception or log a 
warn msg.
    */
   private void checkForSpuriousDeletes(HoodieMetadataPayload metadataPayload, 
String partitionName) {
     if (!metadataPayload.getDeletions().isEmpty()) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index bd0f4904d29..1677b4b4b88 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -309,17 +309,12 @@ public class FileSystemBackedTableMetadata extends 
AbstractHoodieTableMetadata {
   }
 
   @Override
-  public Map<String, List<HoodieRecord<HoodieMetadataPayload>>> 
getAllRecordsByKeys(List<String> keys, String partitionName) {
-    throw new HoodieMetadataException("Unsupported operation: 
getAllRecordsByKeys!");
-  }
-
-  @Override
-  public Map<String, List<HoodieRecordGlobalLocation>> 
readRecordIndex(List<String> recordKeys) {
+  public Map<String, HoodieRecordGlobalLocation> readRecordIndex(List<String> 
recordKeys) {
     throw new HoodieMetadataException("Unsupported operation: 
readRecordIndex!");
   }
 
   @Override
-  public Map<String, List<HoodieRecordGlobalLocation>> 
readSecondaryIndex(List<String> secondaryKeys, String partitionName) {
+  public Map<String, HoodieRecordGlobalLocation> 
readSecondaryIndex(List<String> secondaryKeys, String partitionName) {
     throw new HoodieMetadataException("Unsupported operation: 
readSecondaryIndex!");
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 87433319719..ea71cc83a45 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -29,7 +29,6 @@ import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
@@ -297,41 +296,6 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     return partitionedKeys;
   }
 
-  @Override
-  public Map<String, List<HoodieRecord<HoodieMetadataPayload>>> 
getAllRecordsByKeys(List<String> keys, String partitionName) {
-    if (keys.isEmpty()) {
-      return Collections.emptyMap();
-    }
-
-    Map<String, List<HoodieRecord<HoodieMetadataPayload>>> result;
-    // Load the file slices for the partition. Each file slice is a shard 
which saves a portion of the keys.
-    List<FileSlice> partitionFileSlices = 
partitionFileSliceMap.computeIfAbsent(partitionName,
-        k -> 
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, 
getMetadataFileSystemView(), partitionName));
-    final int numFileSlices = partitionFileSlices.size();
-    checkState(numFileSlices > 0, "Number of file slices for partition " + 
partitionName + " should be > 0");
-
-    // Lookup keys from each file slice
-    if (numFileSlices == 1) {
-      // Optimization for a single slice for smaller metadata table partitions
-      result = lookupAllKeysFromFileSlice(partitionName, keys, 
partitionFileSlices.get(0));
-    } else {
-      // Parallel lookup for large sized partitions with many file slices
-      // Partition the keys by the file slice which contains it
-      ArrayList<ArrayList<String>> partitionedKeys = 
partitionKeysByFileSlices(keys, numFileSlices);
-      result = new HashMap<>(keys.size());
-      getEngineContext().setJobStatus(this.getClass().getSimpleName(), 
"Reading keys from metadata table partition " + partitionName);
-      getEngineContext().map(partitionedKeys, keysList -> {
-        if (keysList.isEmpty()) {
-          return Collections.<String, 
HoodieRecord<HoodieMetadataPayload>>emptyMap();
-        }
-        int shardIndex = 
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(keysList.get(0), 
numFileSlices);
-        return lookupAllKeysFromFileSlice(partitionName, keysList, 
partitionFileSlices.get(shardIndex));
-      }, partitionedKeys.size()).forEach(map -> result.putAll((Map<String, 
List<HoodieRecord<HoodieMetadataPayload>>>) map));
-    }
-
-    return result;
-  }
-
   /**
    * Lookup list of keys from a single file slice.
    *
@@ -440,119 +404,6 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     return result;
   }
 
-  private Map<String, List<HoodieRecord<HoodieMetadataPayload>>> 
lookupAllKeysFromFileSlice(String partitionName, List<String> keys, FileSlice 
fileSlice) {
-    Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers = 
getOrCreateReaders(partitionName, fileSlice);
-    try {
-      List<Long> timings = new ArrayList<>();
-      HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
-      HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
-      if (baseFileReader == null && logRecordScanner == null) {
-        return Collections.emptyMap();
-      }
-
-      // Sort it here once so that we don't need to sort individually for base 
file and for each individual log files.
-      List<String> sortedKeys = new ArrayList<>(keys);
-      Collections.sort(sortedKeys);
-      Map<String, List<HoodieRecord<HoodieMetadataPayload>>> logRecords = 
readAllLogRecords(logRecordScanner, sortedKeys, timings);
-      return readFromBaseAndMergeWithAllLogRecords(baseFileReader, sortedKeys, 
true, logRecords, timings, partitionName);
-    } catch (IOException ioe) {
-      throw new HoodieIOException("Error merging records from metadata table 
for  " + keys.size() + " key : ", ioe);
-    } finally {
-      if (!reuse) {
-        closeReader(readers);
-      }
-    }
-  }
-
-  private Map<String, List<HoodieRecord<HoodieMetadataPayload>>> 
readAllLogRecords(HoodieMetadataLogRecordReader logRecordReader,
-                                                                               
    List<String> sortedKeys,
-                                                                               
    List<Long> timings) {
-    HoodieTimer timer = HoodieTimer.start();
-
-    if (logRecordReader == null) {
-      timings.add(timer.endTimer());
-      return Collections.emptyMap();
-    }
-
-    try {
-      return logRecordReader.getAllRecordsByKeys(sortedKeys);
-    } finally {
-      timings.add(timer.endTimer());
-    }
-  }
-
-  private Map<String, List<HoodieRecord<HoodieMetadataPayload>>> 
readFromBaseAndMergeWithAllLogRecords(HoodieSeekingFileReader<?> reader,
-                                                                               
                        List<String> sortedKeys,
-                                                                               
                        boolean fullKeys,
-                                                                               
                        Map<String, List<HoodieRecord<HoodieMetadataPayload>>> 
logRecords,
-                                                                               
                        List<Long> timings,
-                                                                               
                        String partitionName) throws IOException {
-    HoodieTimer timer = HoodieTimer.start();
-
-    if (reader == null) {
-      // No base file at all
-      timings.add(timer.endTimer());
-      return logRecords;
-    }
-
-    HoodieTimer readTimer = HoodieTimer.start();
-
-    Map<String, List<HoodieRecord<HoodieMetadataPayload>>> records =
-        fetchBaseFileAllRecordsByKeys(reader, sortedKeys, fullKeys, 
partitionName);
-
-    metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer()));
-
-    // Iterate over all provided log-records, merging them into existing 
records
-
-    logRecords.entrySet().forEach(kv -> {
-      records.merge(
-          kv.getKey(),
-          kv.getValue(),
-          (oldRecordList, newRecordList) -> {
-            List<HoodieRecord<HoodieMetadataPayload>> mergedRecordList = new 
ArrayList<>();
-            HoodieMetadataPayload mergedPayload = null;
-            HoodieKey key = null;
-            if (!oldRecordList.isEmpty() && !newRecordList.isEmpty()) {
-              mergedPayload = 
newRecordList.get(0).getData().preCombine(oldRecordList.get(0).getData());
-              key = newRecordList.get(0).getKey();
-            } else if (!oldRecordList.isEmpty()) {
-              mergedPayload = oldRecordList.get(0).getData();
-              key = oldRecordList.get(0).getKey();
-            } else if (!newRecordList.isEmpty()) {
-              mergedPayload = newRecordList.get(0).getData();
-              key = newRecordList.get(0).getKey();
-            }
-
-            if (mergedPayload != null && !mergedPayload.isDeleted()) {
-              mergedRecordList.add(new HoodieAvroRecord<>(key, mergedPayload));
-            }
-            return mergedRecordList;
-          }
-      );
-    });
-
-    timings.add(timer.endTimer());
-    return records;
-  }
-
-  private Map<String, List<HoodieRecord<HoodieMetadataPayload>>> 
fetchBaseFileAllRecordsByKeys(HoodieSeekingFileReader reader,
-                                                                               
                List<String> sortedKeys,
-                                                                               
                boolean fullKeys,
-                                                                               
                String partitionName) throws IOException {
-    ClosableIterator<HoodieRecord<?>> records = fullKeys
-        ? reader.getRecordsByKeysIterator(sortedKeys)
-        : reader.getRecordsByKeyPrefixIterator(sortedKeys);
-
-    return toStream(records)
-        .map(record -> {
-          GenericRecord data = (GenericRecord) record.getData();
-          return Pair.of(
-              (String) (data).get(HoodieMetadataPayload.KEY_FIELD_NAME),
-              composeRecord(data, partitionName));
-        })
-        .collect(Collectors.groupingBy(Pair::getKey, 
Collectors.mapping(Pair::getValue, Collectors.toList())));
-  }
-
   private HoodieRecord<HoodieMetadataPayload> composeRecord(GenericRecord 
avroRecord, String partitionName) {
     if (metadataTableConfig.populateMetaFields()) {
       return SpillableMapUtils.convertToHoodieRecordPayload(avroRecord,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
index 0277672237a..92a6841103b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.metadata;
 
+import org.apache.avro.Schema;
+
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
@@ -27,8 +29,6 @@ import 
org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 
-import org.apache.avro.Schema;
-
 import javax.annotation.concurrent.ThreadSafe;
 
 import java.io.Closeable;
@@ -112,24 +112,7 @@ public class HoodieMetadataLogRecordReader implements 
Closeable {
       return sortedKeys.stream()
           .map(key -> (HoodieRecord<HoodieMetadataPayload>) 
allRecords.get(key))
           .filter(Objects::nonNull)
-          .collect(Collectors.toMap(HoodieRecord::getRecordKey, r -> r, (r1, 
r2) -> r2));
-    }
-  }
-
-  public Map<String, List<HoodieRecord<HoodieMetadataPayload>>> 
getAllRecordsByKeys(List<String> sortedKeys) {
-    if (sortedKeys.isEmpty()) {
-      return Collections.emptyMap();
-    }
-
-    // NOTE: Locking is necessary since we're accessing 
[[HoodieMetadataLogRecordReader]]
-    //       materialized state, to make sure there's no concurrent access
-    synchronized (this) {
-      logRecordScanner.scanByFullKeys(sortedKeys);
-      Map<String, HoodieRecord> allRecords = logRecordScanner.getRecords();
-      return sortedKeys.stream()
-          .map(key -> (HoodieRecord<HoodieMetadataPayload>) 
allRecords.get(key))
-          .filter(Objects::nonNull)
-          .collect(Collectors.groupingBy(HoodieRecord::getRecordKey));
+          .collect(Collectors.toMap(HoodieRecord::getRecordKey, r -> r));
     }
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
index 47db4b33736..ff8eae014e1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
@@ -189,6 +189,7 @@ public interface HoodieTableMetadata extends Serializable, 
AutoCloseable {
 
   /**
    * Fetch all files for given partition paths.
+   *
    * NOTE: Absolute partition paths are expected here
    */
   Map<String, List<StoragePathInfo>> 
getAllFilesInPartitions(Collection<String> partitionPaths)
@@ -255,15 +256,15 @@ public interface HoodieTableMetadata extends 
Serializable, AutoCloseable {
 
   /**
    * Returns the location of record keys which are found in the record index.
-   * Records that are not found are ignored and won't be part of map object 
that is returned.
+   * Records that are not found are ignored and wont be part of map object 
that is returned.
    */
-  Map<String, List<HoodieRecordGlobalLocation>> readRecordIndex(List<String> 
recordKeys);
+  Map<String, HoodieRecordGlobalLocation> readRecordIndex(List<String> 
recordKeys);
 
   /**
    * Returns the location of records which the provided secondary keys maps to.
    * Records that are not found are ignored and won't be part of map object 
that is returned.
    */
-  Map<String, List<HoodieRecordGlobalLocation>> 
readSecondaryIndex(List<String> secondaryKeys, String partitionName);
+  Map<String, HoodieRecordGlobalLocation> readSecondaryIndex(List<String> 
secondaryKeys, String partitionName);
 
   /**
    * Fetch records by key prefixes. Key prefix passed is expected to match the 
same prefix as stored in Metadata table partitions. For eg, in case of col 
stats partition,
@@ -277,15 +278,6 @@ public interface HoodieTableMetadata extends Serializable, 
AutoCloseable {
                                                                           
String partitionName,
                                                                           
boolean shouldLoadInMemory);
 
-  /**
-   * Fetch records for given keys. A key could have multiple records 
associated with it. This method returns all the records for given keys.
-   *
-   * @param keys          list of key for which interested records are looked 
up for.
-   * @param partitionName partition name in metadata table where the records 
are looked up for.
-   * @return Map of key to {@link List} of {@link HoodieRecord}s with records 
matching the passed in keys.
-   */
-  Map<String, List<HoodieRecord<HoodieMetadataPayload>>> 
getAllRecordsByKeys(List<String> keys, String partitionName);
-
   /**
    * Get the instant time to which the metadata is synced w.r.t data timeline.
    */
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
index 0d423251e6e..314a7d0dcfe 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.compress.CompressionCodec;
 import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
@@ -51,7 +52,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -185,7 +185,7 @@ public class HFileUtils extends FileFormatUtils {
     int keyWidth = useIntegerKey ? (int) Math.ceil(Math.log(records.size())) + 
1 : -1;
 
     // Serialize records into bytes
-    Map<String, List<byte[]>> sortedRecordsMap = new TreeMap<>();
+    Map<String, byte[]> sortedRecordsMap = new TreeMap<>();
 
     Iterator<HoodieRecord> itr = records.iterator();
     int id = 0;
@@ -199,26 +199,26 @@ public class HFileUtils extends FileFormatUtils {
       }
 
       final byte[] recordBytes = serializeRecord(record, writerSchema, 
keyFieldName);
-      // If key exists in the map, append to its list. If not, create a new 
list.
-      // Get the existing list of recordBytes for the recordKey, or an empty 
list if it doesn't exist
-      List<byte[]> recordBytesList = sortedRecordsMap.getOrDefault(recordKey, 
new ArrayList<>());
-      recordBytesList.add(recordBytes);
-      // Put the updated list back into the map
-      sortedRecordsMap.put(recordKey, recordBytesList);
+      if (sortedRecordsMap.containsKey(recordKey)) {
+        LOG.error("Found duplicate record with recordKey: {} ", recordKey);
+        logRecordMetadata("Previous record", sortedRecordsMap.get(recordKey), 
writerSchema);
+        logRecordMetadata("Current record", recordBytes, writerSchema);
+        throw new HoodieException(String.format("Writing multiple records with 
same key %s not supported for Hfile format with Metadata table",
+            recordKey));
+      }
+      sortedRecordsMap.put(recordKey, recordBytes);
     }
 
     HFile.Writer writer = HFile.getWriterFactory(conf, cacheConfig)
         .withOutputStream(ostream).withFileContext(context).create();
 
     // Write the records
-    sortedRecordsMap.forEach((recordKey, recordBytesList) -> {
-      for (byte[] recordBytes : recordBytesList) {
-        try {
-          KeyValue kv = new KeyValue(recordKey.getBytes(), null, null, 
recordBytes);
-          writer.append(kv);
-        } catch (IOException e) {
-          throw new HoodieIOException("IOException serializing records", e);
-        }
+    sortedRecordsMap.forEach((recordKey, recordBytes) -> {
+      try {
+        KeyValue kv = new KeyValue(recordKey.getBytes(), null, null, 
recordBytes);
+        writer.append(kv);
+      } catch (IOException e) {
+        throw new HoodieIOException("IOException serializing records", e);
       }
     });
 
@@ -232,6 +232,19 @@ public class HFileUtils extends FileFormatUtils {
     return baos.toByteArray();
   }
 
+  /**
+   * Print the meta fields of the record of interest
+   */
+  private void logRecordMetadata(String msg, byte[] bs, Schema schema) throws 
IOException {
+    GenericRecord record = HoodieAvroUtils.bytesToAvro(bs, schema);
+    if (schema.getField(HoodieRecord.RECORD_KEY_METADATA_FIELD) != null) {
+      LOG.error("{}: Hudi meta field values -> Record key: {}, Partition Path: 
{}, FileName: {}, CommitTime: {}, CommitSeqNo: {}", msg,
+          record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD), 
record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD),
+          record.get(HoodieRecord.FILENAME_METADATA_FIELD), 
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD),
+          record.get(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD));
+    }
+  }
+
   @Override
   public Pair<byte[], Object> serializeRecordsToLogBlock(
       HoodieStorage storage,
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java
index 6967f19246d..b81f57beb69 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java
@@ -29,7 +29,6 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem;
 import org.apache.hudi.io.storage.HoodieAvroFileWriter;
 import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
-import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.storage.StoragePath;
 
 import org.apache.avro.Schema;
@@ -141,9 +140,7 @@ public class HoodieAvroHFileWriter
 
   @Override
   public void writeAvro(String recordKey, IndexedRecord record) throws 
IOException {
-    // do not allow duplicates for record index (primary index)
-    // secondary index can have duplicates
-    if (prevRecordKey.equals(recordKey) && 
file.getName().startsWith(MetadataPartitionType.RECORD_INDEX.getFileIdPrefix()))
 {
+    if (prevRecordKey.equals(recordKey)) {
       throw new HoodieDuplicateKeyException("Duplicate recordKey " + recordKey 
+ " found while writing to HFile."
           + "Record payload: " + record);
     }
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHBaseAvroHFileReader.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHBaseAvroHFileReader.java
index cabff429f13..dfcc36d2ab1 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHBaseAvroHFileReader.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHBaseAvroHFileReader.java
@@ -370,11 +370,9 @@ public class HoodieHBaseAvroHFileReader extends 
HoodieAvroHFileReaderImplBase {
     return new KeyPrefixIterator();
   }
 
-  private static Iterator<IndexedRecord> 
getRecordByKeyIteratorInternal(HFileScanner scanner,
-                                                                        String 
key,
-                                                                        Schema 
writerSchema,
-                                                                        Schema 
readerSchema) throws IOException {
-    KeyValue kv = new KeyValue(getUTF8Bytes(key), null, null, null);
+  private static Option<IndexedRecord> fetchRecordByKeyInternal(HFileScanner 
scanner, String key, Schema writerSchema, Schema readerSchema) throws 
IOException {
+    byte[] keyBytes = getUTF8Bytes(key);
+    KeyValue kv = new KeyValue(keyBytes, null, null, null);
     // NOTE: HFile persists both keys/values as bytes, therefore 
lexicographical sorted is
     //       essentially employed
     //
@@ -384,59 +382,22 @@ public class HoodieHBaseAvroHFileReader extends 
HoodieAvroHFileReaderImplBase {
     //    b) 0, such that c[i] = cell and scanner is left in position i;
     //    c) and 1, such that c[i] < cell, and scanner is left in position i.
     // In summary, with exact match, we are interested in return value of 0. 
in all other cases, key is not found.
-    // Also, do remember we are using reseekTo(), which means, the cursor will 
not rewind after searching for a key.
-    // Let's say the file contains key01, key02, .., key20.
+    // Also, do remeber we are using reseek(), which means, the cursor will 
not rewind after searching for a key.
+    // Lets say the file contains key01, key02, .., key20.
     // After searching for key09, if we search for key05, it may not return 
the matching entry since just after reseeking to key09, the cursor is at key09 
and
     // further reseek calls may not look back in positions.
-    int val = scanner.reseekTo(kv);
-    if (val != 0) {
+
+    if (scanner.reseekTo(kv) != 0) {
       // key is not found.
-      return Collections.emptyIterator();
+      return Option.empty();
     }
 
     // key is found and the cursor is left where the key is found
-    class KeyIterator implements Iterator<IndexedRecord> {
-      private IndexedRecord next = null;
-      private boolean eof = false;
-
-      @Override
-      public boolean hasNext() {
-        if (next != null) {
-          return true;
-        } else if (eof) {
-          return false;
-        }
+    Cell c = scanner.getCell();
+    byte[] valueBytes = copyValueFromCell(c);
+    GenericRecord record = deserialize(keyBytes, valueBytes, writerSchema, 
readerSchema);
 
-        Cell c = Objects.requireNonNull(scanner.getCell());
-        byte[] keyBytes = copyKeyFromCell(c);
-        String currentKey = new String(keyBytes);
-        // Check whether we're still reading records corresponding to the 
key-prefix
-        if (!currentKey.equals(key)) {
-          return false;
-        }
-
-        // Extract the byte value before releasing the lock since we cannot 
hold on to the returned cell afterward
-        byte[] valueBytes = copyValueFromCell(c);
-        try {
-          next = deserialize(keyBytes, valueBytes, writerSchema, readerSchema);
-          // In case scanner is not able to advance, it means we reached EOF
-          eof = !scanner.next();
-        } catch (IOException e) {
-          throw new HoodieIOException("Failed to deserialize payload", e);
-        }
-
-        return true;
-      }
-
-      @Override
-      public IndexedRecord next() {
-        IndexedRecord next = this.next;
-        this.next = null;
-        return next;
-      }
-    }
-
-    return new KeyIterator();
+    return Option.of(record);
   }
 
   private static GenericRecord getRecordFromCell(Cell cell, Schema 
writerSchema, Schema readerSchema) throws IOException {
@@ -550,7 +511,6 @@ public class HoodieHBaseAvroHFileReader extends 
HoodieAvroHFileReaderImplBase {
     private final Schema writerSchema;
 
     private IndexedRecord next = null;
-    private Iterator<IndexedRecord> currentRecordIterator = null;
 
     RecordByKeyIterator(HFile.Reader reader, HFileScanner scanner, 
List<String> sortedKeys, Schema writerSchema, Schema readerSchema) throws 
IOException {
       this.sortedKeyIterator = sortedKeys.iterator();
@@ -570,28 +530,13 @@ public class HoodieHBaseAvroHFileReader extends 
HoodieAvroHFileReaderImplBase {
           return true;
         }
 
-        // Continue returning records for the current key, if any left
-        while (currentRecordIterator != null && 
currentRecordIterator.hasNext()) {
-          Option<IndexedRecord> value = 
Option.of(currentRecordIterator.next());
+        while (sortedKeyIterator.hasNext()) {
+          Option<IndexedRecord> value = fetchRecordByKeyInternal(scanner, 
sortedKeyIterator.next(), writerSchema, readerSchema);
           if (value.isPresent()) {
             next = value.get();
             return true;
           }
         }
-
-        // Move on to the next key and start iterating over its records
-        while (sortedKeyIterator.hasNext()) {
-          currentRecordIterator = getRecordByKeyIteratorInternal(scanner, 
sortedKeyIterator.next(), writerSchema, readerSchema);
-          if (currentRecordIterator.hasNext()) {
-            Option<IndexedRecord> value = 
Option.of(currentRecordIterator.next());
-            if (value.isPresent()) {
-              next = value.get();
-              return true;
-            }
-          }
-        }
-
-        // No more keys or records
         return false;
       } catch (IOException e) {
         throw new HoodieIOException("unable to read next record from hfile ", 
e);
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java
index e3743b5255a..f34034d0a35 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java
@@ -28,7 +28,6 @@ import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
@@ -42,20 +41,14 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 import java.util.Spliterator;
 import java.util.Spliterators;
-import java.util.TreeMap;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
-import static org.apache.hudi.common.testutils.FileSystemTestUtils.RANDOM;
 import static 
org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
 import static org.apache.hudi.common.util.CollectionUtils.toStream;
 import static org.apache.hudi.io.hfile.TestHFileReader.KEY_CREATOR;
@@ -147,59 +140,4 @@ public class TestHoodieHBaseHFileReaderWriter extends 
TestHoodieHFileReaderWrite
         VALUE_CREATOR,
         uniqueKeys);
   }
-
-  /**
-   * Test HFile reader with duplicates.
-   * HFile can have duplicates in case of secondary index for instance.
-   */
-  @Test
-  public void testHFileReaderWriterWithDuplicates() throws Exception {
-    Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, 
"/exampleSchema.avsc");
-    HoodieAvroHFileWriter writer = createWriter(avroSchema, false);
-    List<String> keys = new ArrayList<>();
-    Map<String, List<GenericRecord>> recordMap = new TreeMap<>();
-    for (int i = 0; i < 50; i++) {
-      // If i is a multiple of 10, select the previous key for duplication
-      String key = i != 0 && i % 10 == 0 ? String.format("%s%04d", "key", i - 
1) : String.format("%s%04d", "key", i);
-
-      // Create a list of records for each key to handle duplicates
-      if (!recordMap.containsKey(key)) {
-        recordMap.put(key, new ArrayList<>());
-      }
-
-      // Create the record
-      GenericRecord record = new GenericData.Record(avroSchema);
-      record.put("_row_key", key);
-      record.put("time", Integer.toString(RANDOM.nextInt()));
-      record.put("number", i);
-      writer.writeAvro(key, record);
-
-      // Add to the record map and key list
-      recordMap.get(key).add(record);
-      keys.add(key);
-    }
-    writer.close();
-
-    try (HoodieAvroHFileReaderImplBase hFileReader = 
(HoodieAvroHFileReaderImplBase)
-        createReader(HoodieTestUtils.getStorage(getFilePath()))) {
-      List<IndexedRecord> records = 
HoodieAvroHFileReaderImplBase.readAllRecords(hFileReader);
-      
assertEquals(recordMap.values().stream().flatMap(List::stream).collect(Collectors.toList()),
 records);
-    }
-
-    for (int i = 0; i < 2; i++) {
-      int randomRowstoFetch = 5 + RANDOM.nextInt(10);
-      Set<String> rowsToFetch = getRandomKeys(randomRowstoFetch, keys);
-
-      List<String> rowsList = new ArrayList<>(rowsToFetch);
-      Collections.sort(rowsList);
-
-      List<GenericRecord> expectedRecords = rowsList.stream().flatMap(row -> 
recordMap.get(row).stream()).collect(Collectors.toList());
-
-      try (HoodieAvroHFileReaderImplBase hFileReader = 
(HoodieAvroHFileReaderImplBase)
-          createReader(HoodieTestUtils.getStorage(getFilePath()))) {
-        List<GenericRecord> result = 
HoodieAvroHFileReaderImplBase.readRecords(hFileReader, rowsList).stream().map(r 
-> (GenericRecord) r).collect(Collectors.toList());
-        assertEquals(expectedRecords, result);
-      }
-    }
-  }
 }
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriterBase.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriterBase.java
index caf09bb94e4..3ed3343e93c 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriterBase.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriterBase.java
@@ -476,7 +476,7 @@ public abstract class TestHoodieHFileReaderWriterBase 
extends TestHoodieReaderWr
         content, hfilePrefix, false, 
HFileBootstrapIndex.HoodieKVComparator.class, 4);
   }
 
-  Set<String> getRandomKeys(int count, List<String> keys) {
+  private Set<String> getRandomKeys(int count, List<String> keys) {
     Set<String> rowKeys = new HashSet<>();
     int totalKeys = keys.size();
     while (rowKeys.size() < count) {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
index 4014cf1a078..fd4484b5db5 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
@@ -76,10 +76,8 @@ class RecordLevelIndexSupport(spark: SparkSession,
     val recordKeyLocationsMap = 
metadataTable.readRecordIndex(JavaConverters.seqAsJavaListConverter(recordKeys).asJava)
     val fileIdToPartitionMap: mutable.Map[String, String] = mutable.Map.empty
     val candidateFiles: mutable.Set[String] = mutable.Set.empty
-    for (locations <- 
JavaConverters.collectionAsScalaIterableConverter(recordKeyLocationsMap.values()).asScala)
 {
-      for (location <- 
JavaConverters.collectionAsScalaIterableConverter(locations).asScala) {
-        fileIdToPartitionMap.put(location.getFileId, location.getPartitionPath)
-      }
+    for (location <- 
JavaConverters.collectionAsScalaIterableConverter(recordKeyLocationsMap.values()).asScala)
 {
+      fileIdToPartitionMap.put(location.getFileId, location.getPartitionPath)
     }
     for (file <- allFiles) {
       val fileId = FSUtils.getFileIdFromFilePath(file)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala
index 45b626f04bb..e0b0ebca7c8 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala
@@ -80,11 +80,7 @@ class SecondaryIndexSupport(spark: SparkSession,
     val recordKeyLocationsMap = 
metadataTable.readSecondaryIndex(JavaConverters.seqAsJavaListConverter(secondaryKeys).asJava,
 secondaryIndexName)
     val fileIdToPartitionMap: mutable.Map[String, String] = mutable.Map.empty
     val candidateFiles: mutable.Set[String] = mutable.Set.empty
-    for (locations <- 
JavaConverters.collectionAsScalaIterableConverter(recordKeyLocationsMap.values()).asScala)
 {
-      for (location <- 
JavaConverters.collectionAsScalaIterableConverter(locations).asScala) {
-        fileIdToPartitionMap.put(location.getFileId, location.getPartitionPath)
-      }
-    }
+    recordKeyLocationsMap.values().forEach(location => 
fileIdToPartitionMap.put(location.getFileId, location.getPartitionPath))
 
     for (file <- allFiles) {
       val fileId = FSUtils.getFileIdFromFilePath(file)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index fd2b328bc2c..26d8d5519e3 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -1169,7 +1169,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
 
     HoodieTableMetadata metadataReader = HoodieTableMetadata.create(
         context, storage, writeConfig.getMetadataConfig(), 
writeConfig.getBasePath());
-    Map<String, List<HoodieRecordGlobalLocation>> result = metadataReader
+    Map<String, HoodieRecordGlobalLocation> result = metadataReader
         
.readRecordIndex(records1.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList()));
     assertEquals(0, result.size(), "RI should not return entries that are 
rolled back.");
     result = metadataReader
@@ -2007,7 +2007,6 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       // 1 partition and 2 commits. total entries should be 2.
       assertEquals(result.size(), 2);
       result.forEach(entry -> {
-        // LOG.warn("Prefix search entries for record key col and first 
partition : " + entry.getRecordKey().toString() + " :: " + 
entry.getData().getColumnStatMetadata().get().toString());
         HoodieMetadataColumnStats metadataColumnStats = 
entry.getData().getColumnStatMetadata().get();
         // for commit time column, min max should be the same since we disable 
small files, every commit will create a new file
         assertEquals(metadataColumnStats.getMinValue(), 
metadataColumnStats.getMaxValue());
@@ -3512,9 +3511,9 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       // RI should have created mappings for all the records inserted above
       HoodieTableMetadata metadataReader = HoodieTableMetadata.create(
           context, storage, writeConfig.getMetadataConfig(), 
writeConfig.getBasePath());
-      Map<String, List<HoodieRecordGlobalLocation>> result = metadataReader
+      Map<String, HoodieRecordGlobalLocation> result = metadataReader
           
.readRecordIndex(allRecords.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList()));
-      assertEquals(allRecords.size(), (int) 
result.values().stream().flatMap(List::stream).count(), "RI should have mapping 
for all the records in firstCommit");
+      assertEquals(allRecords.size(), result.size(), "RI should have mapping 
for all the records in firstCommit");
 
       // Delete some records from each commit. This should also remove the RI 
mapping.
       recordsToDelete = firstBatchOfrecords.subList(0, 3);
@@ -3546,7 +3545,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
 
       // RI should not return mappings for deleted records
       metadataReader = HoodieTableMetadata.create(context, storage, 
writeConfig.getMetadataConfig(), writeConfig.getBasePath());
-      Map<String, List<HoodieRecordGlobalLocation>> result = 
metadataReader.readRecordIndex(allRecords.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList()));
+      Map<String, HoodieRecordGlobalLocation> result = 
metadataReader.readRecordIndex(allRecords.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList()));
       assertEquals(allRecords.size() - keysToDelete.size(), result.size(), "RI 
should not have mapping for deleted records");
       result.keySet().forEach(mappingKey -> 
assertFalse(keysToDelete.contains(mappingKey), "RI should not have mapping for 
deleted records"));
 
@@ -3557,9 +3556,9 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       // New mappings should have been created for re-inserted records and 
should map to the new commit time
       metadataReader = HoodieTableMetadata.create(context, storage, 
writeConfig.getMetadataConfig(), writeConfig.getBasePath());
       result = 
metadataReader.readRecordIndex(allRecords.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList()));
-      assertEquals(allRecords.size(), (int) 
result.values().stream().flatMap(List::stream).count(), "RI should have 
mappings for re-inserted records");
+      assertEquals(allRecords.size(), result.size(), "RI should have mappings 
for re-inserted records");
       for (String reInsertedKey : keysToDelete) {
-        assertTrue(result.get(reInsertedKey).stream().anyMatch(location -> 
location.getInstantTime().equals(reinsertTime)), "RI mapping for re-inserted 
keys should have new commit time");
+        assertEquals(reinsertTime, result.get(reInsertedKey).getInstantTime(), 
"RI mapping for re-inserted keys should have new commit time");
       }
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
index 872bca957a7..cbd01eab0e2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
@@ -130,10 +130,7 @@ class RecordLevelIndexTestBase extends 
HoodieStatsIndexTestBase {
       val recordKey: String = row.getAs("_hoodie_record_key")
       val partitionPath: String = row.getAs("_hoodie_partition_path")
       val fileName: String = row.getAs("_hoodie_file_name")
-      val recordLocations = recordIndexMap.get(recordKey)
-      assertFalse(recordLocations.isEmpty)
-      // assuming no duplicate keys for now
-      val recordLocation = recordLocations.get(0)
+      val recordLocation = recordIndexMap.get(recordKey)
       assertEquals(partitionPath, recordLocation.getPartitionPath)
       assertTrue(fileName.startsWith(recordLocation.getFileId), fileName + " 
should start with " + recordLocation.getFileId)
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
index b71ba02b993..73ae93317c2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
@@ -186,10 +186,7 @@ class TestMetadataRecordIndex extends 
HoodieSparkClientTestBase {
       val recordKey: String = row.getAs("_hoodie_record_key")
       val partitionPath: String = row.getAs("_hoodie_partition_path")
       val fileName: String = row.getAs("_hoodie_file_name")
-      val recordLocations = recordIndexMap.get(recordKey)
-      assertFalse(recordLocations.isEmpty)
-      // assuming no duplicate keys for now
-      val recordLocation = recordLocations.get(0)
+      val recordLocation = recordIndexMap.get(recordKey)
       assertEquals(partitionPath, recordLocation.getPartitionPath)
       if (!writeConfig.inlineClusteringEnabled && 
!writeConfig.isAsyncClusteringEnabled) {
         // The file id changes after clustering, so only assert it for usual 
upsert and compaction operations

Reply via email to