This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a3a58ee09c [HUDI-6431] Support update partition path in record-level 
index (#9041)
0a3a58ee09c is described below

commit 0a3a58ee09cff816e47bafc63d760ba4de60e5a0
Author: Shiyan Xu <[email protected]>
AuthorDate: Thu Jun 29 19:47:56 2023 -0700

    [HUDI-6431] Support update partition path in record-level index (#9041)
---
 .../org/apache/hudi/config/HoodieIndexConfig.java  |  13 ++-
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   6 +-
 .../org/apache/hudi/index/HoodieIndexUtils.java    | 128 ++++++++++++++++-----
 .../apache/hudi/index/bloom/HoodieBloomIndex.java  |   2 +-
 .../hudi/index/bloom/HoodieGlobalBloomIndex.java   |  51 ++------
 .../hudi/index/bucket/HoodieBucketIndex.java       |   5 +-
 .../hudi/index/simple/HoodieGlobalSimpleIndex.java |  94 ++++-----------
 .../hudi/index/simple/HoodieSimpleIndex.java       |   6 +-
 .../hudi/io/HoodieKeyLocationFetchHandle.java      |  23 ++--
 ...arkConsistentBucketDuplicateUpdateStrategy.java |   6 +-
 .../hudi/index/SparkMetadataTableRecordIndex.java  |  49 +-------
 .../index/bloom/TestHoodieGlobalBloomIndex.java    |   2 +-
 .../TestGlobalIndexEnableUpdatePartitions.java     |  26 +++--
 13 files changed, 196 insertions(+), 215 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index 7c730def11b..c77b9780548 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -258,6 +258,12 @@ public class HoodieIndexConfig extends HoodieConfig {
       .markAdvanced()
       .withDocumentation("Similar to " + 
BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE + ", but for simple index.");
 
+  public static final ConfigProperty<String> 
RECORD_INDEX_UPDATE_PARTITION_PATH_ENABLE = ConfigProperty
+      .key("hoodie.record.index.update.partition.path")
+      .defaultValue("false")
+      .markAdvanced()
+      .withDocumentation("Similar to " + 
BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE + ", but for record index.");
+
   public static final ConfigProperty<String> 
GLOBAL_INDEX_RECONCILE_PARALLELISM = ConfigProperty
       .key("hoodie.global.index.reconcile.parallelism")
       .defaultValue("60")
@@ -649,7 +655,7 @@ public class HoodieIndexConfig extends HoodieConfig {
       return this;
     }
 
-    public Builder withBloomIndexUpdatePartitionPath(boolean 
updatePartitionPath) {
+    public Builder withGlobalBloomIndexUpdatePartitionPath(boolean 
updatePartitionPath) {
       hoodieIndexConfig.setValue(BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE, 
String.valueOf(updatePartitionPath));
       return this;
     }
@@ -679,6 +685,11 @@ public class HoodieIndexConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withRecordIndexUpdatePartitionPath(boolean 
updatePartitionPath) {
+      hoodieIndexConfig.setValue(RECORD_INDEX_UPDATE_PARTITION_PATH_ENABLE, 
String.valueOf(updatePartitionPath));
+      return this;
+    }
+
     public Builder withGlobalIndexReconcileParallelism(int parallelism) {
       hoodieIndexConfig.setValue(GLOBAL_INDEX_RECONCILE_PARALLELISM, 
String.valueOf(parallelism));
       return this;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 7b672abf241..bc964b3cfe8 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1913,7 +1913,7 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getInt(HoodieIndexConfig.BLOOM_INDEX_KEYS_PER_BUCKET);
   }
 
-  public boolean getBloomIndexUpdatePartitionPath() {
+  public boolean getGlobalBloomIndexUpdatePartitionPath() {
     return 
getBoolean(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE);
   }
 
@@ -1969,6 +1969,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getBoolean(HoodieIndexConfig.RECORD_INDEX_USE_CACHING);
   }
 
+  public boolean getRecordIndexUpdatePartitionPath() {
+    return 
getBoolean(HoodieIndexConfig.RECORD_INDEX_UPDATE_PARTITION_PATH_ENABLE);
+  }
+
   /**
    * storage properties.
    */
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index 46ad232022d..24a4dc05d10 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -20,14 +20,19 @@ package org.apache.hudi.index;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodiePairData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.MetadataValues;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -132,23 +137,34 @@ public class HoodieIndexUtils {
   /**
    * Get tagged record for the passed in {@link HoodieRecord}.
    *
-   * @param inputRecord instance of {@link HoodieRecord} for which tagging is 
requested
-   * @param location    {@link HoodieRecordLocation} for the passed in {@link 
HoodieRecord}
+   * @param record   instance of {@link HoodieRecord} for which tagging is 
requested
+   * @param location {@link HoodieRecordLocation} for the passed in {@link 
HoodieRecord}
    * @return the tagged {@link HoodieRecord}
    */
-  public static <R> HoodieRecord<R> getTaggedRecord(HoodieRecord<R> 
inputRecord, Option<HoodieRecordLocation> location) {
-    HoodieRecord<R> record = inputRecord;
+  public static <R> HoodieRecord<R> tagAsNewRecordIfNeeded(HoodieRecord<R> 
record, Option<HoodieRecordLocation> location) {
     if (location.isPresent()) {
       // When you have a record in multiple files in the same partition, then 
<row key, record> collection
       // will have 2 entries with the same exact in memory copy of the 
HoodieRecord and the 2
       // separate filenames that the record is found in. This will result in 
setting
       // currentLocation 2 times and it will fail the second time. So creating 
a new in memory
       // copy of the hoodie record.
-      record = inputRecord.newInstance();
-      record.unseal();
-      record.setCurrentLocation(location.get());
-      record.seal();
+      HoodieRecord<R> newRecord = record.newInstance();
+      newRecord.unseal();
+      newRecord.setCurrentLocation(location.get());
+      newRecord.seal();
+      return newRecord;
+    } else {
+      return record;
     }
+  }
+
+  /**
+   * Tag the record to an existing location. Not creating any new instance.
+   */
+  public static <R> HoodieRecord<R> tagRecord(HoodieRecord<R> record, 
HoodieRecordLocation location) {
+    record.unseal();
+    record.setCurrentLocation(location);
+    record.seal();
     return record;
   }
 
@@ -213,19 +229,16 @@ public class HoodieIndexUtils {
    * @return {@link HoodieRecord}s that have the current location being set.
    */
   private static <R> HoodieData<HoodieRecord<R>> getExistingRecords(
-      HoodieData<Pair<String, HoodieRecordLocation>> partitionLocations, 
HoodieWriteConfig config, HoodieTable hoodieTable) {
+      HoodieData<HoodieRecordGlobalLocation> partitionLocations, 
HoodieWriteConfig config, HoodieTable hoodieTable) {
     final Option<String> instantTime = hoodieTable
         .getMetaClient()
         .getCommitsTimeline()
         .filterCompletedInstants()
         .lastInstant()
         .map(HoodieInstant::getTimestamp);
-    return partitionLocations.flatMap(p -> {
-      String partitionPath = p.getLeft();
-      String fileId = p.getRight().getFileId();
-      return new HoodieMergedReadHandle(config, instantTime, hoodieTable, 
Pair.of(partitionPath, fileId))
-          .getMergedRecords().iterator();
-    });
+    return partitionLocations.flatMap(p
+        -> new HoodieMergedReadHandle(config, instantTime, hoodieTable, 
Pair.of(p.getPartitionPath(), p.getFileId()))
+        .getMergedRecords().iterator());
   }
 
   /**
@@ -261,55 +274,108 @@ public class HoodieIndexUtils {
   /**
    * Merge tagged incoming records with existing records in case of partition 
path updated.
    */
-  public static <R> HoodieData<HoodieRecord<R>> mergeForPartitionUpdates(
-      HoodieData<Pair<HoodieRecord<R>, Option<Pair<String, 
HoodieRecordLocation>>>> taggedHoodieRecords, HoodieWriteConfig config, 
HoodieTable hoodieTable) {
+  public static <R> HoodieData<HoodieRecord<R>> 
mergeForPartitionUpdatesIfNeeded(
+      HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>> 
incomingRecordsAndLocations, HoodieWriteConfig config, HoodieTable hoodieTable) 
{
     // completely new records
-    HoodieData<HoodieRecord<R>> newRecords = taggedHoodieRecords.filter(p -> 
!p.getRight().isPresent()).map(Pair::getLeft);
-    // the records tagged to existing base files
-    HoodieData<HoodieRecord<R>> updatingRecords = taggedHoodieRecords.filter(p 
-> p.getRight().isPresent()).map(Pair::getLeft)
+    HoodieData<HoodieRecord<R>> taggedNewRecords = 
incomingRecordsAndLocations.filter(p -> 
!p.getRight().isPresent()).map(Pair::getLeft);
+    // the records found in existing base files
+    HoodieData<HoodieRecord<R>> untaggedUpdatingRecords = 
incomingRecordsAndLocations.filter(p -> 
p.getRight().isPresent()).map(Pair::getLeft)
         .distinctWithKey(HoodieRecord::getRecordKey, 
config.getGlobalIndexReconcileParallelism());
     // the tagging partitions and locations
-    HoodieData<Pair<String, HoodieRecordLocation>> partitionLocations = 
taggedHoodieRecords
+    HoodieData<HoodieRecordGlobalLocation> globalLocations = 
incomingRecordsAndLocations
         .filter(p -> p.getRight().isPresent())
         .map(p -> p.getRight().get())
         .distinct(config.getGlobalIndexReconcileParallelism());
     // merged existing records with current locations being set
-    HoodieData<HoodieRecord<R>> existingRecords = 
getExistingRecords(partitionLocations, config, hoodieTable);
+    HoodieData<HoodieRecord<R>> existingRecords = 
getExistingRecords(globalLocations, config, hoodieTable);
 
     final HoodieRecordMerger recordMerger = config.getRecordMerger();
-    HoodieData<HoodieRecord<R>> taggedUpdatingRecords = 
updatingRecords.mapToPair(r -> Pair.of(r.getRecordKey(), r))
+    HoodieData<HoodieRecord<R>> taggedUpdatingRecords = 
untaggedUpdatingRecords.mapToPair(r -> Pair.of(r.getRecordKey(), r))
         .leftOuterJoin(existingRecords.mapToPair(r -> 
Pair.of(r.getRecordKey(), r)))
         .values().flatMap(entry -> {
           HoodieRecord<R> incoming = entry.getLeft();
           Option<HoodieRecord<R>> existingOpt = entry.getRight();
           if (!existingOpt.isPresent()) {
             // existing record not found (e.g., due to delete log not merged 
to base file): tag as a new record
-            return Collections.singletonList(getTaggedRecord(incoming, 
Option.empty())).iterator();
+            return Collections.singletonList(incoming).iterator();
           }
           HoodieRecord<R> existing = existingOpt.get();
           Schema writeSchema = new 
Schema.Parser().parse(config.getWriteSchema());
           if (incoming.isDelete(writeSchema, config.getProps())) {
             // incoming is a delete: force tag the incoming to the old 
partition
-            return 
Collections.singletonList(getTaggedRecord(incoming.newInstance(existing.getKey()),
 Option.of(existing.getCurrentLocation()))).iterator();
+            return 
Collections.singletonList(tagRecord(incoming.newInstance(existing.getKey()), 
existing.getCurrentLocation())).iterator();
           }
 
           Option<HoodieRecord<R>> mergedOpt = 
mergeIncomingWithExistingRecord(incoming, existing, writeSchema, config, 
recordMerger);
           if (!mergedOpt.isPresent()) {
             // merge resulted in delete: force tag the incoming to the old 
partition
-            return 
Collections.singletonList(getTaggedRecord(incoming.newInstance(existing.getKey()),
 Option.of(existing.getCurrentLocation()))).iterator();
+            return 
Collections.singletonList(tagRecord(incoming.newInstance(existing.getKey()), 
existing.getCurrentLocation())).iterator();
           }
           HoodieRecord<R> merged = mergedOpt.get();
           if (Objects.equals(merged.getPartitionPath(), 
existing.getPartitionPath())) {
             // merged record has the same partition: route the merged result 
to the current location as an update
-            return Collections.singletonList(getTaggedRecord(merged, 
Option.of(existing.getCurrentLocation()))).iterator();
+            return Collections.singletonList(tagRecord(merged, 
existing.getCurrentLocation())).iterator();
           } else {
             // merged record has a different partition: issue a delete to the 
old partition and insert the merged record to the new partition
             HoodieRecord<R> deleteRecord = createDeleteRecord(config, 
existing.getKey());
-            deleteRecord.setCurrentLocation(existing.getCurrentLocation());
-            deleteRecord.seal();
-            return Arrays.asList(deleteRecord, getTaggedRecord(merged, 
Option.empty())).iterator();
+            return Arrays.asList(tagRecord(deleteRecord, 
existing.getCurrentLocation()), merged).iterator();
+          }
+        });
+    return taggedUpdatingRecords.union(taggedNewRecords);
+  }
+
+  public static <R> HoodieData<HoodieRecord<R>> tagGlobalLocationBackToRecords(
+      HoodieData<HoodieRecord<R>> incomingRecords,
+      HoodiePairData<String, HoodieRecordGlobalLocation> 
keyAndExistingLocations,
+      boolean mayContainDuplicateLookup,
+      boolean shouldUpdatePartitionPath,
+      HoodieWriteConfig config,
+      HoodieTable table) {
+    final HoodieRecordMerger merger = config.getRecordMerger();
+
+    HoodiePairData<String, HoodieRecord<R>> keyAndIncomingRecords =
+        incomingRecords.mapToPair(record -> Pair.of(record.getRecordKey(), 
record));
+
+    // Pair of incoming record and the global location if meant for merged 
lookup in later stage
+    HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>> 
incomingRecordsAndLocations
+        = keyAndIncomingRecords.leftOuterJoin(keyAndExistingLocations).values()
+        .map(v -> {
+          final HoodieRecord<R> incomingRecord = v.getLeft();
+          Option<HoodieRecordGlobalLocation> currentLocOpt = 
Option.ofNullable(v.getRight().orElse(null));
+          if (currentLocOpt.isPresent()) {
+            HoodieRecordGlobalLocation currentLoc = currentLocOpt.get();
+            boolean shouldDoMergedLookUpThenTag = mayContainDuplicateLookup
+                || !Objects.equals(incomingRecord.getPartitionPath(), 
currentLoc.getPartitionPath());
+            if (shouldUpdatePartitionPath && shouldDoMergedLookUpThenTag) {
+              // the pair's right side is a non-empty Option, which indicates 
that a merged lookup will be performed
+              // at a later stage.
+              return Pair.of(incomingRecord, currentLocOpt);
+            } else {
+              // - When update partition path is set to false,
+              //   the incoming record will be tagged to the existing record's 
partition regardless of being equal or not.
+              // - When update partition path is set to true,
+              //   the incoming record will be tagged to the existing record's 
partition
+              //   when partition is not updated and the look-up won't have 
duplicates (e.g. COW, or using RLI).
+              return Pair.of(createNewTaggedHoodieRecord(incomingRecord, 
currentLoc, merger.getRecordType()), Option.empty());
+            }
+          } else {
+            return Pair.of(incomingRecord, Option.empty());
           }
         });
-    return taggedUpdatingRecords.union(newRecords);
+    return shouldUpdatePartitionPath
+        ? mergeForPartitionUpdatesIfNeeded(incomingRecordsAndLocations, 
config, table)
+        : incomingRecordsAndLocations.map(Pair::getLeft);
+  }
+
+  public static <R> HoodieRecord<R> 
createNewTaggedHoodieRecord(HoodieRecord<R> oldRecord, 
HoodieRecordGlobalLocation location, HoodieRecordType recordType) {
+    switch (recordType) {
+      case AVRO:
+        HoodieKey recordKey = new HoodieKey(oldRecord.getRecordKey(), 
location.getPartitionPath());
+        return tagRecord(new HoodieAvroRecord(recordKey, (HoodieRecordPayload) 
oldRecord.getData()), location);
+      case SPARK:
+        return tagRecord(oldRecord.newInstance(), location);
+      default:
+        throw new HoodieIndexException("Unsupported record type: " + 
recordType);
+    }
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
index cca8516cb7f..eca347a75bc 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
@@ -313,7 +313,7 @@ public class HoodieBloomIndex extends HoodieIndex<Object, 
Object> {
     // Here as the records might have more data than keyFilenamePairs (some 
row keys' fileId is null),
     // so we do left outer join.
     return keyRecordPairs.leftOuterJoin(keyFilenamePair).values()
-        .map(v -> HoodieIndexUtils.getTaggedRecord(v.getLeft(), 
Option.ofNullable(v.getRight().orElse(null))));
+        .map(v -> HoodieIndexUtils.tagAsNewRecordIfNeeded(v.getLeft(), 
Option.ofNullable(v.getRight().orElse(null))));
   }
 
   @Override
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
index 39b602f4f4b..178bc6156e7 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
@@ -23,25 +23,23 @@ import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodiePairData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.model.HoodieRecordLocation;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.index.HoodieIndexUtils;
 import org.apache.hudi.table.HoodieTable;
 
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Stream;
 
-import static org.apache.hudi.index.HoodieIndexUtils.mergeForPartitionUpdates;
+import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
+import static 
org.apache.hudi.index.HoodieIndexUtils.tagGlobalLocationBackToRecords;
 
 /**
  * This filter will only work with hoodie table since it will only load 
partitions
@@ -102,42 +100,13 @@ public class HoodieGlobalBloomIndex extends 
HoodieBloomIndex {
       HoodiePairData<HoodieKey, HoodieRecordLocation> keyLocationPairs,
       HoodieData<HoodieRecord<R>> records,
       HoodieTable hoodieTable) {
-    final boolean shouldUpdatePartitionPath = 
config.getBloomIndexUpdatePartitionPath() && hoodieTable.isPartitioned();
-
-    HoodiePairData<String, HoodieRecord<R>> incomingRowKeyRecordPairs =
-        records.mapToPair(record -> new ImmutablePair<>(record.getRecordKey(), 
record));
-
-    HoodiePairData<String, Pair<HoodieRecordLocation, HoodieKey>> 
existingRecordKeyToRecordLocationHoodieKeyMap =
-        keyLocationPairs.mapToPair(p -> new ImmutablePair<>(
-            p.getKey().getRecordKey(), new ImmutablePair<>(p.getValue(), 
p.getKey())));
-
-    // Pair of a tagged record and the partition+location if tagged
-    // Here as the records might have more data than rowKeys (some rowKeys' 
fileId is null), so we do left outer join.
-    HoodieData<Pair<HoodieRecord<R>, Option<Pair<String, 
HoodieRecordLocation>>>> taggedRecordsAndLocationInfo = 
incomingRowKeyRecordPairs
-        .leftOuterJoin(existingRecordKeyToRecordLocationHoodieKeyMap)
-        .values().map(record -> {
-          final HoodieRecord<R> hoodieRecord = record.getLeft();
-          final Option<Pair<HoodieRecordLocation, HoodieKey>> 
recordLocationHoodieKeyPair = record.getRight();
-          if (recordLocationHoodieKeyPair.isPresent()) {
-            // Record key matched to file
-            if (shouldUpdatePartitionPath) {
-              Pair<HoodieRecordLocation, HoodieKey> 
hoodieRecordLocationHoodieKeyPair = recordLocationHoodieKeyPair.get();
-              return Pair.of(hoodieRecord, 
Option.of(Pair.of(hoodieRecordLocationHoodieKeyPair.getRight().getPartitionPath(),
 hoodieRecordLocationHoodieKeyPair.getLeft())));
-            } else {
-              // Ignore the incoming record's partition, regardless of whether 
it differs from its old partition or not.
-              // When it differs, the record will still be updated at its old 
partition.
-              return Pair.of(
-                  (HoodieRecord<R>) HoodieIndexUtils.getTaggedRecord(new 
HoodieAvroRecord(recordLocationHoodieKeyPair.get().getRight(), 
(HoodieRecordPayload) hoodieRecord.getData()),
-                      
Option.ofNullable(recordLocationHoodieKeyPair.get().getLeft())), 
Option.empty());
-            }
-          } else {
-            return Pair.of(HoodieIndexUtils.getTaggedRecord(hoodieRecord, 
Option.empty()), Option.empty());
-          }
-        });
-
-    return shouldUpdatePartitionPath
-        ? mergeForPartitionUpdates(taggedRecordsAndLocationInfo, config, 
hoodieTable)
-        : taggedRecordsAndLocationInfo.map(Pair::getLeft);
+    HoodiePairData<String, HoodieRecordGlobalLocation> keyAndExistingLocations 
= keyLocationPairs
+        .mapToPair(p -> Pair.of(p.getLeft().getRecordKey(),
+            
HoodieRecordGlobalLocation.fromLocal(p.getLeft().getPartitionPath(), 
p.getRight())));
+    boolean mayContainDuplicateLookup = 
hoodieTable.getMetaClient().getTableType() == MERGE_ON_READ;
+    boolean shouldUpdatePartitionPath = 
config.getGlobalBloomIndexUpdatePartitionPath() && hoodieTable.isPartitioned();
+    return tagGlobalLocationBackToRecords(records, keyAndExistingLocations,
+        mayContainDuplicateLookup, shouldUpdatePartitionPath, config, 
hoodieTable);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
index 321ccce9091..c8bf749aece 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
@@ -29,7 +29,6 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.index.HoodieIndexUtils;
 import org.apache.hudi.table.HoodieTable;
 
 import org.slf4j.Logger;
@@ -38,6 +37,8 @@ import org.slf4j.LoggerFactory;
 import java.util.Arrays;
 import java.util.List;
 
+import static org.apache.hudi.index.HoodieIndexUtils.tagAsNewRecordIfNeeded;
+
 /**
  * Hash indexing mechanism.
  */
@@ -81,7 +82,7 @@ public abstract class HoodieBucketIndex extends 
HoodieIndex<Object, Object> {
             // TODO maybe batch the operation to improve performance
             HoodieRecord record = inputItr.next();
             Option<HoodieRecordLocation> loc = 
mapper.getRecordLocation(record.getKey());
-            return HoodieIndexUtils.getTaggedRecord(record, loc);
+            return tagAsNewRecordIfNeeded(record, loc);
           }
         },
         false
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
index b2883fe962d..5256b036fde 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
@@ -23,26 +23,22 @@ import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodiePairData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordLocation;
-import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.VisibleForTesting;
-import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.io.HoodieKeyLocationFetchHandle;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.table.HoodieTable;
 
 import java.util.List;
 
+import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
 import static 
org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions;
-import static org.apache.hudi.index.HoodieIndexUtils.mergeForPartitionUpdates;
+import static 
org.apache.hudi.index.HoodieIndexUtils.tagGlobalLocationBackToRecords;
 
 /**
  * A global simple index which reads interested fields(record key and 
partition path) from base files and
@@ -72,32 +68,30 @@ public class HoodieGlobalSimpleIndex extends 
HoodieSimpleIndex {
   protected <R> HoodieData<HoodieRecord<R>> tagLocationInternal(
       HoodieData<HoodieRecord<R>> inputRecords, HoodieEngineContext context,
       HoodieTable hoodieTable) {
-
-    HoodiePairData<String, HoodieRecord<R>> keyedInputRecords =
-        inputRecords.mapToPair(entry -> new 
ImmutablePair<>(entry.getRecordKey(), entry));
-    HoodiePairData<HoodieKey, HoodieRecordLocation> allRecordLocationsInTable =
-        fetchAllRecordLocations(context, hoodieTable, 
config.getGlobalSimpleIndexParallelism());
-    return getTaggedRecords(keyedInputRecords, allRecordLocationsInTable, 
hoodieTable);
+    List<Pair<String, HoodieBaseFile>> latestBaseFiles = 
getAllBaseFilesInTable(context, hoodieTable);
+    HoodiePairData<String, HoodieRecordGlobalLocation> allKeysAndLocations =
+        fetchRecordGlobalLocations(context, hoodieTable, 
config.getGlobalSimpleIndexParallelism(), latestBaseFiles);
+    boolean mayContainDuplicateLookup = 
hoodieTable.getMetaClient().getTableType() == MERGE_ON_READ;
+    boolean shouldUpdatePartitionPath = 
config.getGlobalSimpleIndexUpdatePartitionPath() && hoodieTable.isPartitioned();
+    return tagGlobalLocationBackToRecords(inputRecords, allKeysAndLocations,
+        mayContainDuplicateLookup, shouldUpdatePartitionPath, config, 
hoodieTable);
   }
 
-  /**
-   * Fetch record locations for passed in {@link HoodieKey}s.
-   *
-   * @param context     instance of {@link HoodieEngineContext} to use
-   * @param hoodieTable instance of {@link HoodieTable} of interest
-   * @param parallelism parallelism to use
-   * @return {@link HoodiePairData} of {@link HoodieKey} and {@link 
HoodieRecordLocation}
-   */
-  protected HoodiePairData<HoodieKey, HoodieRecordLocation> 
fetchAllRecordLocations(
-      HoodieEngineContext context, HoodieTable hoodieTable, int parallelism) {
-    List<Pair<String, HoodieBaseFile>> latestBaseFiles = 
getAllBaseFilesInTable(context, hoodieTable);
-    return fetchRecordLocations(context, hoodieTable, parallelism, 
latestBaseFiles);
+  private HoodiePairData<String, HoodieRecordGlobalLocation> 
fetchRecordGlobalLocations(
+      HoodieEngineContext context, HoodieTable hoodieTable, int parallelism,
+      List<Pair<String, HoodieBaseFile>> baseFiles) {
+    int fetchParallelism = Math.max(1, Math.min(baseFiles.size(), 
parallelism));
+
+    return context.parallelize(baseFiles, fetchParallelism)
+        .flatMap(partitionPathBaseFile -> new 
HoodieKeyLocationFetchHandle(config, hoodieTable, partitionPathBaseFile, 
keyGeneratorOpt)
+            .globalLocations().iterator())
+        .mapToPair(e -> (Pair<String, HoodieRecordGlobalLocation>) e);
   }
 
   /**
    * Load all files for all partitions as <Partition, filename> pair data.
    */
-  protected List<Pair<String, HoodieBaseFile>> getAllBaseFilesInTable(
+  private List<Pair<String, HoodieBaseFile>> getAllBaseFilesInTable(
       final HoodieEngineContext context, final HoodieTable hoodieTable) {
     HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
     List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(context, 
config.getMetadataConfig(), metaClient.getBasePath());
@@ -105,52 +99,6 @@ public class HoodieGlobalSimpleIndex extends 
HoodieSimpleIndex {
     return getLatestBaseFilesForAllPartitions(allPartitionPaths, context, 
hoodieTable);
   }
 
-  /**
-   * Tag records with right {@link HoodieRecordLocation}.
-   *
-   * @param incomingRecords incoming {@link HoodieRecord}s
-   * @param existingRecords existing records with {@link HoodieRecordLocation}s
-   * @return {@link HoodieData} of {@link HoodieRecord}s with tagged {@link 
HoodieRecordLocation}s
-   */
-  @VisibleForTesting
-  <R> HoodieData<HoodieRecord<R>> getTaggedRecords(
-      HoodiePairData<String, HoodieRecord<R>> incomingRecords,
-      HoodiePairData<HoodieKey, HoodieRecordLocation> existingRecords,
-      HoodieTable hoodieTable) {
-    final boolean shouldUpdatePartitionPath = 
config.getGlobalSimpleIndexUpdatePartitionPath() && hoodieTable.isPartitioned();
-
-    HoodiePairData<String, Pair<String, HoodieRecordLocation>> 
existingRecordByRecordKey =
-        existingRecords.mapToPair(
-            entry -> new ImmutablePair<>(entry.getLeft().getRecordKey(),
-                Pair.of(entry.getLeft().getPartitionPath(), 
entry.getRight())));
-
-    // Pair of a tagged record and the partition+location if tagged
-    HoodieData<Pair<HoodieRecord<R>, Option<Pair<String, 
HoodieRecordLocation>>>> taggedRecordsAndLocationInfo = 
incomingRecords.leftOuterJoin(existingRecordByRecordKey).values()
-        .map(entry -> {
-          HoodieRecord<R> inputRecord = entry.getLeft();
-          Option<Pair<String, HoodieRecordLocation>> partitionPathLocationPair 
= Option.ofNullable(entry.getRight().orElse(null));
-          if (partitionPathLocationPair.isPresent()) {
-            String partitionPath = partitionPathLocationPair.get().getKey();
-            HoodieRecordLocation location = 
partitionPathLocationPair.get().getRight();
-            if (shouldUpdatePartitionPath) {
-              // The incoming record may need to be inserted to a new 
partition; keep the location info for merging later.
-              return Pair.of(inputRecord, partitionPathLocationPair);
-            } else {
-              // Ignore the incoming record's partition, regardless of whether 
it differs from its old partition or not.
-              // When it differs, the record will still be updated at its old 
partition.
-              HoodieRecord<R> newRecord = new HoodieAvroRecord(new 
HoodieKey(inputRecord.getRecordKey(), partitionPath), (HoodieRecordPayload) 
inputRecord.getData());
-              return Pair.of(HoodieIndexUtils.getTaggedRecord(newRecord, 
Option.of(location)), Option.empty());
-            }
-          } else {
-            return Pair.of(HoodieIndexUtils.getTaggedRecord(inputRecord, 
Option.empty()), Option.empty());
-          }
-        });
-
-    return shouldUpdatePartitionPath
-        ? mergeForPartitionUpdates(taggedRecordsAndLocationInfo, config, 
hoodieTable)
-        : taggedRecordsAndLocationInfo.map(Pair::getLeft);
-  }
-
   @Override
   public boolean isGlobal() {
     return true;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java
index dbc49d0655f..cca7a43d1f9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java
@@ -34,7 +34,6 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.index.HoodieIndexUtils;
 import org.apache.hudi.io.HoodieKeyLocationFetchHandle;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.table.HoodieTable;
@@ -42,6 +41,7 @@ import org.apache.hudi.table.HoodieTable;
 import java.util.List;
 
 import static 
org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions;
+import static org.apache.hudi.index.HoodieIndexUtils.tagAsNewRecordIfNeeded;
 
 /**
  * A simple index which reads interested fields(record key and partition path) 
from base files and
@@ -50,7 +50,7 @@ import static 
org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPar
 public class HoodieSimpleIndex
     extends HoodieIndex<Object, Object> {
 
-  private final Option<BaseKeyGenerator> keyGeneratorOpt;
+  protected final Option<BaseKeyGenerator> keyGeneratorOpt;
 
   public HoodieSimpleIndex(HoodieWriteConfig config, Option<BaseKeyGenerator> 
keyGeneratorOpt) {
     super(config);
@@ -122,7 +122,7 @@ public class HoodieSimpleIndex
         keyedInputRecords.leftOuterJoin(existingLocationsOnTable).map(entry -> 
{
           final HoodieRecord<R> untaggedRecord = entry.getRight().getLeft();
           final Option<HoodieRecordLocation> location = 
Option.ofNullable(entry.getRight().getRight().orElse(null));
-          return HoodieIndexUtils.getTaggedRecord(untaggedRecord, location);
+          return tagAsNewRecordIfNeeded(untaggedRecord, location);
         });
 
     if (config.getSimpleIndexUseCaching()) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
index f0625303687..ae643b80cbc 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
@@ -20,6 +20,7 @@ package org.apache.hudi.io;
 
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.util.BaseFileUtils;
 import org.apache.hudi.common.util.Option;
@@ -30,7 +31,6 @@ import org.apache.hudi.table.HoodieTable;
 
 import org.apache.hadoop.fs.Path;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Stream;
 
@@ -51,17 +51,26 @@ public class HoodieKeyLocationFetchHandle<T, I, K, O> 
extends HoodieReadHandle<T
     this.keyGeneratorOpt = keyGeneratorOpt;
   }
 
-  public Stream<Pair<HoodieKey, HoodieRecordLocation>> locations() {
-    HoodieBaseFile baseFile = partitionPathBaseFilePair.getRight();
+  private List<HoodieKey> fetchHoodieKeys(HoodieBaseFile baseFile) {
     BaseFileUtils baseFileUtils = 
BaseFileUtils.getInstance(baseFile.getPath());
-    List<HoodieKey> hoodieKeyList = new ArrayList<>();
     if (keyGeneratorOpt.isPresent()) {
-      hoodieKeyList = 
baseFileUtils.fetchHoodieKeys(hoodieTable.getHadoopConf(), new 
Path(baseFile.getPath()), keyGeneratorOpt);
+      return baseFileUtils.fetchHoodieKeys(hoodieTable.getHadoopConf(), new 
Path(baseFile.getPath()), keyGeneratorOpt);
     } else {
-      hoodieKeyList = 
baseFileUtils.fetchHoodieKeys(hoodieTable.getHadoopConf(), new 
Path(baseFile.getPath()));
+      return baseFileUtils.fetchHoodieKeys(hoodieTable.getHadoopConf(), new 
Path(baseFile.getPath()));
     }
-    return hoodieKeyList.stream()
+  }
+
+  public Stream<Pair<HoodieKey, HoodieRecordLocation>> locations() {
+    HoodieBaseFile baseFile = partitionPathBaseFilePair.getRight();
+    return fetchHoodieKeys(baseFile).stream()
         .map(entry -> Pair.of(entry,
             new HoodieRecordLocation(baseFile.getCommitTime(), 
baseFile.getFileId())));
   }
+
+  public Stream<Pair<String, HoodieRecordGlobalLocation>> globalLocations() {
+    HoodieBaseFile baseFile = partitionPathBaseFilePair.getRight();
+    return fetchHoodieKeys(baseFile).stream()
+        .map(entry -> Pair.of(entry.getRecordKey(),
+            new HoodieRecordGlobalLocation(entry.getPartitionPath(), 
baseFile.getCommitTime(), baseFile.getFileId())));
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
index 7d13a5940b4..99af0a14819 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
@@ -56,7 +56,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
-import static org.apache.hudi.index.HoodieIndexUtils.getTaggedRecord;
+import static org.apache.hudi.index.HoodieIndexUtils.tagAsNewRecordIfNeeded;
 
 /**
  * Update strategy for (consistent hashing) bucket index
@@ -109,8 +109,8 @@ public class SparkConsistentBucketDuplicateUpdateStrategy<T 
extends HoodieRecord
     List<String> indexKeyFields = 
Arrays.asList(table.getConfig().getBucketIndexHashField().split(","));
     HoodieData<HoodieRecord<T>> redirectedRecordsRDD = 
filteredRecordsRDD.map(r -> {
       ConsistentHashingNode node = 
partitionToIdentifier.get(r.getPartitionPath()).getBucket(r.getKey(), 
indexKeyFields);
-      return getTaggedRecord(new HoodieAvroRecord(r.getKey(), r.getData(), 
r.getOperation()),
-          Option.ofNullable(new 
HoodieRecordLocation(partitionToInstant.get(r.getPartitionPath()), 
FSUtils.createNewFileId(node.getFileIdPrefix(), 0))));
+      return tagAsNewRecordIfNeeded(new HoodieAvroRecord(r.getKey(), 
r.getData(), r.getOperation()),
+          Option.of(new 
HoodieRecordLocation(partitionToInstant.get(r.getPartitionPath()), 
FSUtils.createNewFileId(node.getFileIdPrefix(), 0))));
     });
 
     // Return combined iterator (the original and records with new location)
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java
index 452f1ce20bf..7811d02f2b5 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java
@@ -23,15 +23,9 @@ import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodiePairData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieAvroRecord;
-import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieSparkRecord;
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaPairRDD;
@@ -55,6 +49,8 @@ import java.util.Map;
 
 import scala.Tuple2;
 
+import static 
org.apache.hudi.index.HoodieIndexUtils.tagGlobalLocationBackToRecords;
+
 /**
  * Hoodie Index implementation backed by the record index present in the 
Metadata Table.
  */
@@ -106,11 +102,13 @@ public class SparkMetadataTableRecordIndex extends 
HoodieIndex<Object, Object> {
     ValidationUtils.checkState(partitionedKeyRDD.getNumPartitions() <= 
numFileGroups);
 
     // Lookup the keys in the record index
-    HoodiePairData<String, HoodieRecordGlobalLocation> keyToLocationPairRDD =
+    HoodiePairData<String, HoodieRecordGlobalLocation> keyAndExistingLocations 
=
         HoodieJavaPairRDD.of(partitionedKeyRDD.mapPartitionsToPair(new 
RecordIndexFileGroupLookupFunction(hoodieTable)));
 
     // Tag the incoming records, as inserts or updates, by joining with 
existing record keys
-    HoodieData<HoodieRecord<R>> taggedRecords = 
tagLocationBackToRecords(keyToLocationPairRDD, records);
+    boolean shouldUpdatePartitionPath = 
config.getRecordIndexUpdatePartitionPath() && hoodieTable.isPartitioned();
+    HoodieData<HoodieRecord<R>> taggedRecords = 
tagGlobalLocationBackToRecords(records, keyAndExistingLocations,
+        false, shouldUpdatePartitionPath, config, hoodieTable);
 
     // The number of partitions in the taggedRecords is expected to the 
maximum of the partitions in
     // keyToLocationPairRDD and records RDD.
@@ -153,41 +151,6 @@ public class SparkMetadataTableRecordIndex extends 
HoodieIndex<Object, Object> {
     return false;
   }
 
-  private <R> HoodieData<HoodieRecord<R>> tagLocationBackToRecords(
-      HoodiePairData<String, HoodieRecordGlobalLocation> keyFilenamePair,
-      HoodieData<HoodieRecord<R>> records) {
-    HoodiePairData<String, HoodieRecord<R>> keyRecordPairs =
-        records.mapToPair(record -> ImmutablePair.of(record.getRecordKey(), 
record));
-    // Here as the records might have more data than keyFilenamePairs (some 
row keys' not found in record index),
-    // we will do left outer join.
-    return keyRecordPairs.leftOuterJoin(keyFilenamePair).values()
-        .map(v -> {
-          HoodieRecord<R> record = v.getLeft();
-          Option<HoodieRecordGlobalLocation> location = 
Option.ofNullable(v.getRight().orElse(null));
-          if (!location.isPresent()) {
-            // No location found.
-            return record;
-          }
-          // Ensure the partitionPath is also set correctly in the key
-          if 
(!record.getPartitionPath().equals(location.get().getPartitionPath())) {
-            record = createNewHoodieRecord(record, location.get());
-          }
-
-          // Perform the tagging. Not using HoodieIndexUtils.getTaggedRecord 
to prevent an additional copy which is not necessary for this index.
-          record.unseal();
-          record.setCurrentLocation(location.get());
-          record.seal();
-          return record;
-        });
-  }
-
-  private HoodieRecord createNewHoodieRecord(HoodieRecord oldRecord, 
HoodieRecordGlobalLocation location) {
-    HoodieKey recordKey = new HoodieKey(oldRecord.getRecordKey(), 
location.getPartitionPath());
-    return config.getRecordMerger().getRecordType() == 
HoodieRecord.HoodieRecordType.AVRO
-        ? new HoodieAvroRecord(recordKey, (HoodieRecordPayload) 
oldRecord.getData())
-        : ((HoodieSparkRecord) oldRecord).newInstance();
-  }
-
   /**
    * Function that lookups a list of keys in a single shard of the record index
    */
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
index aa96020fa93..77a06f8a359 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
@@ -228,7 +228,7 @@ public class TestHoodieGlobalBloomIndex extends 
TestHoodieMetadataBase {
     HoodieWriteConfig config = 
HoodieWriteConfig.newBuilder().withPath(basePath)
         .withIndexConfig(HoodieIndexConfig.newBuilder()
             .withIndexType(HoodieIndex.IndexType.GLOBAL_BLOOM)
-            .withBloomIndexUpdatePartitionPath(false)
+            .withGlobalBloomIndexUpdatePartitionPath(false)
             .build())
         .build();
     HoodieGlobalBloomIndex index = new HoodieGlobalBloomIndex(config, 
SparkHoodieBloomIndexHelper.getInstance());
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java
index 3f49e6757fd..677e478ffb8 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java
@@ -28,7 +28,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodiePayloadConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.index.HoodieIndex.IndexType;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
 
 import org.apache.spark.sql.Dataset;
@@ -53,6 +53,7 @@ import static 
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerat
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.getCommitTimeAtUTC;
 import static org.apache.hudi.index.HoodieIndex.IndexType.GLOBAL_BLOOM;
 import static org.apache.hudi.index.HoodieIndex.IndexType.GLOBAL_SIMPLE;
+import static org.apache.hudi.index.HoodieIndex.IndexType.RECORD_INDEX;
 import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -62,14 +63,16 @@ public class TestGlobalIndexEnableUpdatePartitions extends 
SparkClientFunctional
     return Stream.of(
         Arguments.of(COPY_ON_WRITE, GLOBAL_SIMPLE),
         Arguments.of(COPY_ON_WRITE, GLOBAL_BLOOM),
+        Arguments.of(COPY_ON_WRITE, RECORD_INDEX),
         Arguments.of(MERGE_ON_READ, GLOBAL_SIMPLE),
-        Arguments.of(MERGE_ON_READ, GLOBAL_BLOOM)
+        Arguments.of(MERGE_ON_READ, GLOBAL_BLOOM),
+        Arguments.of(MERGE_ON_READ, RECORD_INDEX)
     );
   }
 
   @ParameterizedTest
   @MethodSource("getTableTypeAndIndexType")
-  public void testPartitionChanges(HoodieTableType tableType, 
HoodieIndex.IndexType indexType) throws IOException {
+  public void testPartitionChanges(HoodieTableType tableType, IndexType 
indexType) throws IOException {
     final Class<?> payloadClass = DefaultHoodieRecordPayload.class;
     HoodieWriteConfig writeConfig = getWriteConfig(payloadClass, indexType);
     HoodieTableMetaClient metaClient = getHoodieMetaClient(tableType, 
writeConfig.getProps());
@@ -127,7 +130,7 @@ public class TestGlobalIndexEnableUpdatePartitions extends 
SparkClientFunctional
 
   @ParameterizedTest
   @MethodSource("getTableTypeAndIndexType")
-  public void testUpdatePartitionsThenDelete(HoodieTableType tableType, 
HoodieIndex.IndexType indexType) throws IOException {
+  public void testUpdatePartitionsThenDelete(HoodieTableType tableType, 
IndexType indexType) throws IOException {
     final Class<?> payloadClass = DefaultHoodieRecordPayload.class;
     HoodieWriteConfig writeConfig = getWriteConfig(payloadClass, indexType);
     HoodieTableMetaClient metaClient = getHoodieMetaClient(tableType, 
writeConfig.getProps());
@@ -202,21 +205,28 @@ public class TestGlobalIndexEnableUpdatePartitions 
extends SparkClientFunctional
     df.unpersist();
   }
 
-  private HoodieWriteConfig getWriteConfig(Class<?> payloadClass, 
HoodieIndex.IndexType indexType) {
+  private HoodieWriteConfig getWriteConfig(Class<?> payloadClass, IndexType 
indexType) {
+    HoodieMetadataConfig.Builder metadataConfigBuilder = 
HoodieMetadataConfig.newBuilder();
+    if (indexType == IndexType.RECORD_INDEX) {
+      metadataConfigBuilder.enable(true).withEnableRecordIndex(true);
+    } else {
+      metadataConfigBuilder.enable(false);
+    }
     return getConfigBuilder(true)
         .withProperties(getKeyGenProps(payloadClass))
         .withParallelism(2, 2)
         .withBulkInsertParallelism(2)
         .withDeleteParallelism(2)
-        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+        .withMetadataConfig(metadataConfigBuilder.build())
         .withIndexConfig(HoodieIndexConfig.newBuilder()
             .withIndexType(indexType)
             .bloomIndexParallelism(2)
             .withSimpleIndexParallelism(2)
             .withGlobalSimpleIndexParallelism(2)
             .withGlobalIndexReconcileParallelism(2)
-            .withBloomIndexUpdatePartitionPath(true)
-            .withGlobalSimpleIndexUpdatePartitionPath(true).build())
+            .withGlobalBloomIndexUpdatePartitionPath(true)
+            .withGlobalSimpleIndexUpdatePartitionPath(true)
+            .withRecordIndexUpdatePartitionPath(true).build())
         .withSchema(SCHEMA_STR)
         .withPayloadConfig(HoodiePayloadConfig.newBuilder()
             .fromProperties(getPayloadProps(payloadClass)).build())

Reply via email to