This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0a3a58ee09c [HUDI-6431] Support update partition path in record-level
index (#9041)
0a3a58ee09c is described below
commit 0a3a58ee09cff816e47bafc63d760ba4de60e5a0
Author: Shiyan Xu <[email protected]>
AuthorDate: Thu Jun 29 19:47:56 2023 -0700
[HUDI-6431] Support update partition path in record-level index (#9041)
---
.../org/apache/hudi/config/HoodieIndexConfig.java | 13 ++-
.../org/apache/hudi/config/HoodieWriteConfig.java | 6 +-
.../org/apache/hudi/index/HoodieIndexUtils.java | 128 ++++++++++++++++-----
.../apache/hudi/index/bloom/HoodieBloomIndex.java | 2 +-
.../hudi/index/bloom/HoodieGlobalBloomIndex.java | 51 ++------
.../hudi/index/bucket/HoodieBucketIndex.java | 5 +-
.../hudi/index/simple/HoodieGlobalSimpleIndex.java | 94 ++++-----------
.../hudi/index/simple/HoodieSimpleIndex.java | 6 +-
.../hudi/io/HoodieKeyLocationFetchHandle.java | 23 ++--
...arkConsistentBucketDuplicateUpdateStrategy.java | 6 +-
.../hudi/index/SparkMetadataTableRecordIndex.java | 49 +-------
.../index/bloom/TestHoodieGlobalBloomIndex.java | 2 +-
.../TestGlobalIndexEnableUpdatePartitions.java | 26 +++--
13 files changed, 196 insertions(+), 215 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index 7c730def11b..c77b9780548 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -258,6 +258,12 @@ public class HoodieIndexConfig extends HoodieConfig {
.markAdvanced()
.withDocumentation("Similar to " +
BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE + ", but for simple index.");
+ public static final ConfigProperty<String>
RECORD_INDEX_UPDATE_PARTITION_PATH_ENABLE = ConfigProperty
+ .key("hoodie.record.index.update.partition.path")
+ .defaultValue("false")
+ .markAdvanced()
+ .withDocumentation("Similar to " +
BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE + ", but for record index.");
+
public static final ConfigProperty<String>
GLOBAL_INDEX_RECONCILE_PARALLELISM = ConfigProperty
.key("hoodie.global.index.reconcile.parallelism")
.defaultValue("60")
@@ -649,7 +655,7 @@ public class HoodieIndexConfig extends HoodieConfig {
return this;
}
- public Builder withBloomIndexUpdatePartitionPath(boolean
updatePartitionPath) {
+ public Builder withGlobalBloomIndexUpdatePartitionPath(boolean
updatePartitionPath) {
hoodieIndexConfig.setValue(BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE,
String.valueOf(updatePartitionPath));
return this;
}
@@ -679,6 +685,11 @@ public class HoodieIndexConfig extends HoodieConfig {
return this;
}
+ public Builder withRecordIndexUpdatePartitionPath(boolean
updatePartitionPath) {
+ hoodieIndexConfig.setValue(RECORD_INDEX_UPDATE_PARTITION_PATH_ENABLE,
String.valueOf(updatePartitionPath));
+ return this;
+ }
+
public Builder withGlobalIndexReconcileParallelism(int parallelism) {
hoodieIndexConfig.setValue(GLOBAL_INDEX_RECONCILE_PARALLELISM,
String.valueOf(parallelism));
return this;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 7b672abf241..bc964b3cfe8 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1913,7 +1913,7 @@ public class HoodieWriteConfig extends HoodieConfig {
return getInt(HoodieIndexConfig.BLOOM_INDEX_KEYS_PER_BUCKET);
}
- public boolean getBloomIndexUpdatePartitionPath() {
+ public boolean getGlobalBloomIndexUpdatePartitionPath() {
return
getBoolean(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE);
}
@@ -1969,6 +1969,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getBoolean(HoodieIndexConfig.RECORD_INDEX_USE_CACHING);
}
+ public boolean getRecordIndexUpdatePartitionPath() {
+ return
getBoolean(HoodieIndexConfig.RECORD_INDEX_UPDATE_PARTITION_PATH_ENABLE);
+ }
+
/**
* storage properties.
*/
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index 46ad232022d..24a4dc05d10 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -20,14 +20,19 @@ package org.apache.hudi.index;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -132,23 +137,34 @@ public class HoodieIndexUtils {
/**
* Get tagged record for the passed in {@link HoodieRecord}.
*
- * @param inputRecord instance of {@link HoodieRecord} for which tagging is
requested
- * @param location {@link HoodieRecordLocation} for the passed in {@link
HoodieRecord}
+ * @param record instance of {@link HoodieRecord} for which tagging is
requested
+ * @param location {@link HoodieRecordLocation} for the passed in {@link
HoodieRecord}
* @return the tagged {@link HoodieRecord}
*/
- public static <R> HoodieRecord<R> getTaggedRecord(HoodieRecord<R>
inputRecord, Option<HoodieRecordLocation> location) {
- HoodieRecord<R> record = inputRecord;
+ public static <R> HoodieRecord<R> tagAsNewRecordIfNeeded(HoodieRecord<R>
record, Option<HoodieRecordLocation> location) {
if (location.isPresent()) {
// When you have a record in multiple files in the same partition, then
<row key, record> collection
// will have 2 entries with the same exact in memory copy of the
HoodieRecord and the 2
// separate filenames that the record is found in. This will result in
setting
// currentLocation 2 times and it will fail the second time. So creating
a new in memory
// copy of the hoodie record.
- record = inputRecord.newInstance();
- record.unseal();
- record.setCurrentLocation(location.get());
- record.seal();
+ HoodieRecord<R> newRecord = record.newInstance();
+ newRecord.unseal();
+ newRecord.setCurrentLocation(location.get());
+ newRecord.seal();
+ return newRecord;
+ } else {
+ return record;
}
+ }
+
+ /**
+ * Tag the record to an existing location. Not creating any new instance.
+ */
+ public static <R> HoodieRecord<R> tagRecord(HoodieRecord<R> record,
HoodieRecordLocation location) {
+ record.unseal();
+ record.setCurrentLocation(location);
+ record.seal();
return record;
}
@@ -213,19 +229,16 @@ public class HoodieIndexUtils {
* @return {@link HoodieRecord}s that have the current location being set.
*/
private static <R> HoodieData<HoodieRecord<R>> getExistingRecords(
- HoodieData<Pair<String, HoodieRecordLocation>> partitionLocations,
HoodieWriteConfig config, HoodieTable hoodieTable) {
+ HoodieData<HoodieRecordGlobalLocation> partitionLocations,
HoodieWriteConfig config, HoodieTable hoodieTable) {
final Option<String> instantTime = hoodieTable
.getMetaClient()
.getCommitsTimeline()
.filterCompletedInstants()
.lastInstant()
.map(HoodieInstant::getTimestamp);
- return partitionLocations.flatMap(p -> {
- String partitionPath = p.getLeft();
- String fileId = p.getRight().getFileId();
- return new HoodieMergedReadHandle(config, instantTime, hoodieTable,
Pair.of(partitionPath, fileId))
- .getMergedRecords().iterator();
- });
+ return partitionLocations.flatMap(p
+ -> new HoodieMergedReadHandle(config, instantTime, hoodieTable,
Pair.of(p.getPartitionPath(), p.getFileId()))
+ .getMergedRecords().iterator());
}
/**
@@ -261,55 +274,108 @@ public class HoodieIndexUtils {
/**
* Merge tagged incoming records with existing records in case of partition
path updated.
*/
- public static <R> HoodieData<HoodieRecord<R>> mergeForPartitionUpdates(
- HoodieData<Pair<HoodieRecord<R>, Option<Pair<String,
HoodieRecordLocation>>>> taggedHoodieRecords, HoodieWriteConfig config,
HoodieTable hoodieTable) {
+ public static <R> HoodieData<HoodieRecord<R>>
mergeForPartitionUpdatesIfNeeded(
+ HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>>
incomingRecordsAndLocations, HoodieWriteConfig config, HoodieTable hoodieTable)
{
// completely new records
- HoodieData<HoodieRecord<R>> newRecords = taggedHoodieRecords.filter(p ->
!p.getRight().isPresent()).map(Pair::getLeft);
- // the records tagged to existing base files
- HoodieData<HoodieRecord<R>> updatingRecords = taggedHoodieRecords.filter(p
-> p.getRight().isPresent()).map(Pair::getLeft)
+ HoodieData<HoodieRecord<R>> taggedNewRecords =
incomingRecordsAndLocations.filter(p ->
!p.getRight().isPresent()).map(Pair::getLeft);
+ // the records found in existing base files
+ HoodieData<HoodieRecord<R>> untaggedUpdatingRecords =
incomingRecordsAndLocations.filter(p ->
p.getRight().isPresent()).map(Pair::getLeft)
.distinctWithKey(HoodieRecord::getRecordKey,
config.getGlobalIndexReconcileParallelism());
// the tagging partitions and locations
- HoodieData<Pair<String, HoodieRecordLocation>> partitionLocations =
taggedHoodieRecords
+ HoodieData<HoodieRecordGlobalLocation> globalLocations =
incomingRecordsAndLocations
.filter(p -> p.getRight().isPresent())
.map(p -> p.getRight().get())
.distinct(config.getGlobalIndexReconcileParallelism());
// merged existing records with current locations being set
- HoodieData<HoodieRecord<R>> existingRecords =
getExistingRecords(partitionLocations, config, hoodieTable);
+ HoodieData<HoodieRecord<R>> existingRecords =
getExistingRecords(globalLocations, config, hoodieTable);
final HoodieRecordMerger recordMerger = config.getRecordMerger();
- HoodieData<HoodieRecord<R>> taggedUpdatingRecords =
updatingRecords.mapToPair(r -> Pair.of(r.getRecordKey(), r))
+ HoodieData<HoodieRecord<R>> taggedUpdatingRecords =
untaggedUpdatingRecords.mapToPair(r -> Pair.of(r.getRecordKey(), r))
.leftOuterJoin(existingRecords.mapToPair(r ->
Pair.of(r.getRecordKey(), r)))
.values().flatMap(entry -> {
HoodieRecord<R> incoming = entry.getLeft();
Option<HoodieRecord<R>> existingOpt = entry.getRight();
if (!existingOpt.isPresent()) {
// existing record not found (e.g., due to delete log not merged
to base file): tag as a new record
- return Collections.singletonList(getTaggedRecord(incoming,
Option.empty())).iterator();
+ return Collections.singletonList(incoming).iterator();
}
HoodieRecord<R> existing = existingOpt.get();
Schema writeSchema = new
Schema.Parser().parse(config.getWriteSchema());
if (incoming.isDelete(writeSchema, config.getProps())) {
// incoming is a delete: force tag the incoming to the old
partition
- return
Collections.singletonList(getTaggedRecord(incoming.newInstance(existing.getKey()),
Option.of(existing.getCurrentLocation()))).iterator();
+ return
Collections.singletonList(tagRecord(incoming.newInstance(existing.getKey()),
existing.getCurrentLocation())).iterator();
}
Option<HoodieRecord<R>> mergedOpt =
mergeIncomingWithExistingRecord(incoming, existing, writeSchema, config,
recordMerger);
if (!mergedOpt.isPresent()) {
// merge resulted in delete: force tag the incoming to the old
partition
- return
Collections.singletonList(getTaggedRecord(incoming.newInstance(existing.getKey()),
Option.of(existing.getCurrentLocation()))).iterator();
+ return
Collections.singletonList(tagRecord(incoming.newInstance(existing.getKey()),
existing.getCurrentLocation())).iterator();
}
HoodieRecord<R> merged = mergedOpt.get();
if (Objects.equals(merged.getPartitionPath(),
existing.getPartitionPath())) {
// merged record has the same partition: route the merged result
to the current location as an update
- return Collections.singletonList(getTaggedRecord(merged,
Option.of(existing.getCurrentLocation()))).iterator();
+ return Collections.singletonList(tagRecord(merged,
existing.getCurrentLocation())).iterator();
} else {
// merged record has a different partition: issue a delete to the
old partition and insert the merged record to the new partition
HoodieRecord<R> deleteRecord = createDeleteRecord(config,
existing.getKey());
- deleteRecord.setCurrentLocation(existing.getCurrentLocation());
- deleteRecord.seal();
- return Arrays.asList(deleteRecord, getTaggedRecord(merged,
Option.empty())).iterator();
+ return Arrays.asList(tagRecord(deleteRecord,
existing.getCurrentLocation()), merged).iterator();
+ }
+ });
+ return taggedUpdatingRecords.union(taggedNewRecords);
+ }
+
+ public static <R> HoodieData<HoodieRecord<R>> tagGlobalLocationBackToRecords(
+ HoodieData<HoodieRecord<R>> incomingRecords,
+ HoodiePairData<String, HoodieRecordGlobalLocation>
keyAndExistingLocations,
+ boolean mayContainDuplicateLookup,
+ boolean shouldUpdatePartitionPath,
+ HoodieWriteConfig config,
+ HoodieTable table) {
+ final HoodieRecordMerger merger = config.getRecordMerger();
+
+ HoodiePairData<String, HoodieRecord<R>> keyAndIncomingRecords =
+ incomingRecords.mapToPair(record -> Pair.of(record.getRecordKey(),
record));
+
+ // Pair of incoming record and the global location if meant for merged
lookup in later stage
+ HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>>
incomingRecordsAndLocations
+ = keyAndIncomingRecords.leftOuterJoin(keyAndExistingLocations).values()
+ .map(v -> {
+ final HoodieRecord<R> incomingRecord = v.getLeft();
+ Option<HoodieRecordGlobalLocation> currentLocOpt =
Option.ofNullable(v.getRight().orElse(null));
+ if (currentLocOpt.isPresent()) {
+ HoodieRecordGlobalLocation currentLoc = currentLocOpt.get();
+ boolean shouldDoMergedLookUpThenTag = mayContainDuplicateLookup
+ || !Objects.equals(incomingRecord.getPartitionPath(),
currentLoc.getPartitionPath());
+ if (shouldUpdatePartitionPath && shouldDoMergedLookUpThenTag) {
+ // the pair's right side is a non-empty Option, which indicates
that a merged lookup will be performed
+ // at a later stage.
+ return Pair.of(incomingRecord, currentLocOpt);
+ } else {
+ // - When update partition path is set to false,
+ // the incoming record will be tagged to the existing record's
partition regardless of being equal or not.
+ // - When update partition path is set to true,
+ // the incoming record will be tagged to the existing record's
partition
+ // when partition is not updated and the look-up won't have
duplicates (e.g. COW, or using RLI).
+ return Pair.of(createNewTaggedHoodieRecord(incomingRecord,
currentLoc, merger.getRecordType()), Option.empty());
+ }
+ } else {
+ return Pair.of(incomingRecord, Option.empty());
}
});
- return taggedUpdatingRecords.union(newRecords);
+ return shouldUpdatePartitionPath
+ ? mergeForPartitionUpdatesIfNeeded(incomingRecordsAndLocations,
config, table)
+ : incomingRecordsAndLocations.map(Pair::getLeft);
+ }
+
+ public static <R> HoodieRecord<R>
createNewTaggedHoodieRecord(HoodieRecord<R> oldRecord,
HoodieRecordGlobalLocation location, HoodieRecordType recordType) {
+ switch (recordType) {
+ case AVRO:
+ HoodieKey recordKey = new HoodieKey(oldRecord.getRecordKey(),
location.getPartitionPath());
+ return tagRecord(new HoodieAvroRecord(recordKey, (HoodieRecordPayload)
oldRecord.getData()), location);
+ case SPARK:
+ return tagRecord(oldRecord.newInstance(), location);
+ default:
+ throw new HoodieIndexException("Unsupported record type: " +
recordType);
+ }
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
index cca8516cb7f..eca347a75bc 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
@@ -313,7 +313,7 @@ public class HoodieBloomIndex extends HoodieIndex<Object,
Object> {
// Here as the records might have more data than keyFilenamePairs (some
row keys' fileId is null),
// so we do left outer join.
return keyRecordPairs.leftOuterJoin(keyFilenamePair).values()
- .map(v -> HoodieIndexUtils.getTaggedRecord(v.getLeft(),
Option.ofNullable(v.getRight().orElse(null))));
+ .map(v -> HoodieIndexUtils.tagAsNewRecordIfNeeded(v.getLeft(),
Option.ofNullable(v.getRight().orElse(null))));
}
@Override
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
index 39b602f4f4b..178bc6156e7 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
@@ -23,25 +23,23 @@ import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieRecordLocation;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.table.HoodieTable;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
-import static org.apache.hudi.index.HoodieIndexUtils.mergeForPartitionUpdates;
+import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
+import static
org.apache.hudi.index.HoodieIndexUtils.tagGlobalLocationBackToRecords;
/**
* This filter will only work with hoodie table since it will only load
partitions
@@ -102,42 +100,13 @@ public class HoodieGlobalBloomIndex extends
HoodieBloomIndex {
HoodiePairData<HoodieKey, HoodieRecordLocation> keyLocationPairs,
HoodieData<HoodieRecord<R>> records,
HoodieTable hoodieTable) {
- final boolean shouldUpdatePartitionPath =
config.getBloomIndexUpdatePartitionPath() && hoodieTable.isPartitioned();
-
- HoodiePairData<String, HoodieRecord<R>> incomingRowKeyRecordPairs =
- records.mapToPair(record -> new ImmutablePair<>(record.getRecordKey(),
record));
-
- HoodiePairData<String, Pair<HoodieRecordLocation, HoodieKey>>
existingRecordKeyToRecordLocationHoodieKeyMap =
- keyLocationPairs.mapToPair(p -> new ImmutablePair<>(
- p.getKey().getRecordKey(), new ImmutablePair<>(p.getValue(),
p.getKey())));
-
- // Pair of a tagged record and the partition+location if tagged
- // Here as the records might have more data than rowKeys (some rowKeys'
fileId is null), so we do left outer join.
- HoodieData<Pair<HoodieRecord<R>, Option<Pair<String,
HoodieRecordLocation>>>> taggedRecordsAndLocationInfo =
incomingRowKeyRecordPairs
- .leftOuterJoin(existingRecordKeyToRecordLocationHoodieKeyMap)
- .values().map(record -> {
- final HoodieRecord<R> hoodieRecord = record.getLeft();
- final Option<Pair<HoodieRecordLocation, HoodieKey>>
recordLocationHoodieKeyPair = record.getRight();
- if (recordLocationHoodieKeyPair.isPresent()) {
- // Record key matched to file
- if (shouldUpdatePartitionPath) {
- Pair<HoodieRecordLocation, HoodieKey>
hoodieRecordLocationHoodieKeyPair = recordLocationHoodieKeyPair.get();
- return Pair.of(hoodieRecord,
Option.of(Pair.of(hoodieRecordLocationHoodieKeyPair.getRight().getPartitionPath(),
hoodieRecordLocationHoodieKeyPair.getLeft())));
- } else {
- // Ignore the incoming record's partition, regardless of whether
it differs from its old partition or not.
- // When it differs, the record will still be updated at its old
partition.
- return Pair.of(
- (HoodieRecord<R>) HoodieIndexUtils.getTaggedRecord(new
HoodieAvroRecord(recordLocationHoodieKeyPair.get().getRight(),
(HoodieRecordPayload) hoodieRecord.getData()),
-
Option.ofNullable(recordLocationHoodieKeyPair.get().getLeft())),
Option.empty());
- }
- } else {
- return Pair.of(HoodieIndexUtils.getTaggedRecord(hoodieRecord,
Option.empty()), Option.empty());
- }
- });
-
- return shouldUpdatePartitionPath
- ? mergeForPartitionUpdates(taggedRecordsAndLocationInfo, config,
hoodieTable)
- : taggedRecordsAndLocationInfo.map(Pair::getLeft);
+ HoodiePairData<String, HoodieRecordGlobalLocation> keyAndExistingLocations
= keyLocationPairs
+ .mapToPair(p -> Pair.of(p.getLeft().getRecordKey(),
+
HoodieRecordGlobalLocation.fromLocal(p.getLeft().getPartitionPath(),
p.getRight())));
+ boolean mayContainDuplicateLookup =
hoodieTable.getMetaClient().getTableType() == MERGE_ON_READ;
+ boolean shouldUpdatePartitionPath =
config.getGlobalBloomIndexUpdatePartitionPath() && hoodieTable.isPartitioned();
+ return tagGlobalLocationBackToRecords(records, keyAndExistingLocations,
+ mayContainDuplicateLookup, shouldUpdatePartitionPath, config,
hoodieTable);
}
@Override
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
index 321ccce9091..c8bf749aece 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
@@ -29,7 +29,6 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.table.HoodieTable;
import org.slf4j.Logger;
@@ -38,6 +37,8 @@ import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
+import static org.apache.hudi.index.HoodieIndexUtils.tagAsNewRecordIfNeeded;
+
/**
* Hash indexing mechanism.
*/
@@ -81,7 +82,7 @@ public abstract class HoodieBucketIndex extends
HoodieIndex<Object, Object> {
// TODO maybe batch the operation to improve performance
HoodieRecord record = inputItr.next();
Option<HoodieRecordLocation> loc =
mapper.getRecordLocation(record.getKey());
- return HoodieIndexUtils.getTaggedRecord(record, loc);
+ return tagAsNewRecordIfNeeded(record, loc);
}
},
false
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
index b2883fe962d..5256b036fde 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
@@ -23,26 +23,22 @@ import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordLocation;
-import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.VisibleForTesting;
-import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.io.HoodieKeyLocationFetchHandle;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.table.HoodieTable;
import java.util.List;
+import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
import static
org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions;
-import static org.apache.hudi.index.HoodieIndexUtils.mergeForPartitionUpdates;
+import static
org.apache.hudi.index.HoodieIndexUtils.tagGlobalLocationBackToRecords;
/**
* A global simple index which reads interested fields(record key and
partition path) from base files and
@@ -72,32 +68,30 @@ public class HoodieGlobalSimpleIndex extends
HoodieSimpleIndex {
protected <R> HoodieData<HoodieRecord<R>> tagLocationInternal(
HoodieData<HoodieRecord<R>> inputRecords, HoodieEngineContext context,
HoodieTable hoodieTable) {
-
- HoodiePairData<String, HoodieRecord<R>> keyedInputRecords =
- inputRecords.mapToPair(entry -> new
ImmutablePair<>(entry.getRecordKey(), entry));
- HoodiePairData<HoodieKey, HoodieRecordLocation> allRecordLocationsInTable =
- fetchAllRecordLocations(context, hoodieTable,
config.getGlobalSimpleIndexParallelism());
- return getTaggedRecords(keyedInputRecords, allRecordLocationsInTable,
hoodieTable);
+ List<Pair<String, HoodieBaseFile>> latestBaseFiles =
getAllBaseFilesInTable(context, hoodieTable);
+ HoodiePairData<String, HoodieRecordGlobalLocation> allKeysAndLocations =
+ fetchRecordGlobalLocations(context, hoodieTable,
config.getGlobalSimpleIndexParallelism(), latestBaseFiles);
+ boolean mayContainDuplicateLookup =
hoodieTable.getMetaClient().getTableType() == MERGE_ON_READ;
+ boolean shouldUpdatePartitionPath =
config.getGlobalSimpleIndexUpdatePartitionPath() && hoodieTable.isPartitioned();
+ return tagGlobalLocationBackToRecords(inputRecords, allKeysAndLocations,
+ mayContainDuplicateLookup, shouldUpdatePartitionPath, config,
hoodieTable);
}
- /**
- * Fetch record locations for passed in {@link HoodieKey}s.
- *
- * @param context instance of {@link HoodieEngineContext} to use
- * @param hoodieTable instance of {@link HoodieTable} of interest
- * @param parallelism parallelism to use
- * @return {@link HoodiePairData} of {@link HoodieKey} and {@link
HoodieRecordLocation}
- */
- protected HoodiePairData<HoodieKey, HoodieRecordLocation>
fetchAllRecordLocations(
- HoodieEngineContext context, HoodieTable hoodieTable, int parallelism) {
- List<Pair<String, HoodieBaseFile>> latestBaseFiles =
getAllBaseFilesInTable(context, hoodieTable);
- return fetchRecordLocations(context, hoodieTable, parallelism,
latestBaseFiles);
+ private HoodiePairData<String, HoodieRecordGlobalLocation>
fetchRecordGlobalLocations(
+ HoodieEngineContext context, HoodieTable hoodieTable, int parallelism,
+ List<Pair<String, HoodieBaseFile>> baseFiles) {
+ int fetchParallelism = Math.max(1, Math.min(baseFiles.size(),
parallelism));
+
+ return context.parallelize(baseFiles, fetchParallelism)
+ .flatMap(partitionPathBaseFile -> new
HoodieKeyLocationFetchHandle(config, hoodieTable, partitionPathBaseFile,
keyGeneratorOpt)
+ .globalLocations().iterator())
+ .mapToPair(e -> (Pair<String, HoodieRecordGlobalLocation>) e);
}
/**
* Load all files for all partitions as <Partition, filename> pair data.
*/
- protected List<Pair<String, HoodieBaseFile>> getAllBaseFilesInTable(
+ private List<Pair<String, HoodieBaseFile>> getAllBaseFilesInTable(
final HoodieEngineContext context, final HoodieTable hoodieTable) {
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(context,
config.getMetadataConfig(), metaClient.getBasePath());
@@ -105,52 +99,6 @@ public class HoodieGlobalSimpleIndex extends
HoodieSimpleIndex {
return getLatestBaseFilesForAllPartitions(allPartitionPaths, context,
hoodieTable);
}
- /**
- * Tag records with right {@link HoodieRecordLocation}.
- *
- * @param incomingRecords incoming {@link HoodieRecord}s
- * @param existingRecords existing records with {@link HoodieRecordLocation}s
- * @return {@link HoodieData} of {@link HoodieRecord}s with tagged {@link
HoodieRecordLocation}s
- */
- @VisibleForTesting
- <R> HoodieData<HoodieRecord<R>> getTaggedRecords(
- HoodiePairData<String, HoodieRecord<R>> incomingRecords,
- HoodiePairData<HoodieKey, HoodieRecordLocation> existingRecords,
- HoodieTable hoodieTable) {
- final boolean shouldUpdatePartitionPath =
config.getGlobalSimpleIndexUpdatePartitionPath() && hoodieTable.isPartitioned();
-
- HoodiePairData<String, Pair<String, HoodieRecordLocation>>
existingRecordByRecordKey =
- existingRecords.mapToPair(
- entry -> new ImmutablePair<>(entry.getLeft().getRecordKey(),
- Pair.of(entry.getLeft().getPartitionPath(),
entry.getRight())));
-
- // Pair of a tagged record and the partition+location if tagged
- HoodieData<Pair<HoodieRecord<R>, Option<Pair<String,
HoodieRecordLocation>>>> taggedRecordsAndLocationInfo =
incomingRecords.leftOuterJoin(existingRecordByRecordKey).values()
- .map(entry -> {
- HoodieRecord<R> inputRecord = entry.getLeft();
- Option<Pair<String, HoodieRecordLocation>> partitionPathLocationPair
= Option.ofNullable(entry.getRight().orElse(null));
- if (partitionPathLocationPair.isPresent()) {
- String partitionPath = partitionPathLocationPair.get().getKey();
- HoodieRecordLocation location =
partitionPathLocationPair.get().getRight();
- if (shouldUpdatePartitionPath) {
- // The incoming record may need to be inserted to a new
partition; keep the location info for merging later.
- return Pair.of(inputRecord, partitionPathLocationPair);
- } else {
- // Ignore the incoming record's partition, regardless of whether
it differs from its old partition or not.
- // When it differs, the record will still be updated at its old
partition.
- HoodieRecord<R> newRecord = new HoodieAvroRecord(new
HoodieKey(inputRecord.getRecordKey(), partitionPath), (HoodieRecordPayload)
inputRecord.getData());
- return Pair.of(HoodieIndexUtils.getTaggedRecord(newRecord,
Option.of(location)), Option.empty());
- }
- } else {
- return Pair.of(HoodieIndexUtils.getTaggedRecord(inputRecord,
Option.empty()), Option.empty());
- }
- });
-
- return shouldUpdatePartitionPath
- ? mergeForPartitionUpdates(taggedRecordsAndLocationInfo, config,
hoodieTable)
- : taggedRecordsAndLocationInfo.map(Pair::getLeft);
- }
-
@Override
public boolean isGlobal() {
return true;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java
index dbc49d0655f..cca7a43d1f9 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java
@@ -34,7 +34,6 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.io.HoodieKeyLocationFetchHandle;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.table.HoodieTable;
@@ -42,6 +41,7 @@ import org.apache.hudi.table.HoodieTable;
import java.util.List;
import static
org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions;
+import static org.apache.hudi.index.HoodieIndexUtils.tagAsNewRecordIfNeeded;
/**
* A simple index which reads interested fields(record key and partition path)
from base files and
@@ -50,7 +50,7 @@ import static
org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPar
public class HoodieSimpleIndex
extends HoodieIndex<Object, Object> {
- private final Option<BaseKeyGenerator> keyGeneratorOpt;
+ protected final Option<BaseKeyGenerator> keyGeneratorOpt;
public HoodieSimpleIndex(HoodieWriteConfig config, Option<BaseKeyGenerator>
keyGeneratorOpt) {
super(config);
@@ -122,7 +122,7 @@ public class HoodieSimpleIndex
keyedInputRecords.leftOuterJoin(existingLocationsOnTable).map(entry ->
{
final HoodieRecord<R> untaggedRecord = entry.getRight().getLeft();
final Option<HoodieRecordLocation> location =
Option.ofNullable(entry.getRight().getRight().orElse(null));
- return HoodieIndexUtils.getTaggedRecord(untaggedRecord, location);
+ return tagAsNewRecordIfNeeded(untaggedRecord, location);
});
if (config.getSimpleIndexUseCaching()) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
index f0625303687..ae643b80cbc 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
@@ -20,6 +20,7 @@ package org.apache.hudi.io;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.Option;
@@ -30,7 +31,6 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.hadoop.fs.Path;
-import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;
@@ -51,17 +51,26 @@ public class HoodieKeyLocationFetchHandle<T, I, K, O>
extends HoodieReadHandle<T
this.keyGeneratorOpt = keyGeneratorOpt;
}
- public Stream<Pair<HoodieKey, HoodieRecordLocation>> locations() {
- HoodieBaseFile baseFile = partitionPathBaseFilePair.getRight();
+ private List<HoodieKey> fetchHoodieKeys(HoodieBaseFile baseFile) {
BaseFileUtils baseFileUtils =
BaseFileUtils.getInstance(baseFile.getPath());
- List<HoodieKey> hoodieKeyList = new ArrayList<>();
if (keyGeneratorOpt.isPresent()) {
- hoodieKeyList =
baseFileUtils.fetchHoodieKeys(hoodieTable.getHadoopConf(), new
Path(baseFile.getPath()), keyGeneratorOpt);
+ return baseFileUtils.fetchHoodieKeys(hoodieTable.getHadoopConf(), new
Path(baseFile.getPath()), keyGeneratorOpt);
} else {
- hoodieKeyList =
baseFileUtils.fetchHoodieKeys(hoodieTable.getHadoopConf(), new
Path(baseFile.getPath()));
+ return baseFileUtils.fetchHoodieKeys(hoodieTable.getHadoopConf(), new
Path(baseFile.getPath()));
}
- return hoodieKeyList.stream()
+ }
+
+ public Stream<Pair<HoodieKey, HoodieRecordLocation>> locations() {
+ HoodieBaseFile baseFile = partitionPathBaseFilePair.getRight();
+ return fetchHoodieKeys(baseFile).stream()
.map(entry -> Pair.of(entry,
new HoodieRecordLocation(baseFile.getCommitTime(),
baseFile.getFileId())));
}
+
+ public Stream<Pair<String, HoodieRecordGlobalLocation>> globalLocations() {
+ HoodieBaseFile baseFile = partitionPathBaseFilePair.getRight();
+ return fetchHoodieKeys(baseFile).stream()
+ .map(entry -> Pair.of(entry.getRecordKey(),
+ new HoodieRecordGlobalLocation(entry.getPartitionPath(),
baseFile.getCommitTime(), baseFile.getFileId())));
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
index 7d13a5940b4..99af0a14819 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
@@ -56,7 +56,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
-import static org.apache.hudi.index.HoodieIndexUtils.getTaggedRecord;
+import static org.apache.hudi.index.HoodieIndexUtils.tagAsNewRecordIfNeeded;
/**
* Update strategy for (consistent hashing) bucket index
@@ -109,8 +109,8 @@ public class SparkConsistentBucketDuplicateUpdateStrategy<T
extends HoodieRecord
List<String> indexKeyFields =
Arrays.asList(table.getConfig().getBucketIndexHashField().split(","));
HoodieData<HoodieRecord<T>> redirectedRecordsRDD =
filteredRecordsRDD.map(r -> {
ConsistentHashingNode node =
partitionToIdentifier.get(r.getPartitionPath()).getBucket(r.getKey(),
indexKeyFields);
- return getTaggedRecord(new HoodieAvroRecord(r.getKey(), r.getData(),
r.getOperation()),
- Option.ofNullable(new
HoodieRecordLocation(partitionToInstant.get(r.getPartitionPath()),
FSUtils.createNewFileId(node.getFileIdPrefix(), 0))));
+ return tagAsNewRecordIfNeeded(new HoodieAvroRecord(r.getKey(),
r.getData(), r.getOperation()),
+ Option.of(new
HoodieRecordLocation(partitionToInstant.get(r.getPartitionPath()),
FSUtils.createNewFileId(node.getFileIdPrefix(), 0))));
});
// Return combined iterator (the original and records with new location)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java
index 452f1ce20bf..7811d02f2b5 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java
@@ -23,15 +23,9 @@ import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieAvroRecord;
-import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieSparkRecord;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaPairRDD;
@@ -55,6 +49,8 @@ import java.util.Map;
import scala.Tuple2;
+import static
org.apache.hudi.index.HoodieIndexUtils.tagGlobalLocationBackToRecords;
+
/**
* Hoodie Index implementation backed by the record index present in the
Metadata Table.
*/
@@ -106,11 +102,13 @@ public class SparkMetadataTableRecordIndex extends
HoodieIndex<Object, Object> {
ValidationUtils.checkState(partitionedKeyRDD.getNumPartitions() <=
numFileGroups);
// Lookup the keys in the record index
- HoodiePairData<String, HoodieRecordGlobalLocation> keyToLocationPairRDD =
+ HoodiePairData<String, HoodieRecordGlobalLocation> keyAndExistingLocations
=
HoodieJavaPairRDD.of(partitionedKeyRDD.mapPartitionsToPair(new
RecordIndexFileGroupLookupFunction(hoodieTable)));
// Tag the incoming records, as inserts or updates, by joining with
existing record keys
- HoodieData<HoodieRecord<R>> taggedRecords =
tagLocationBackToRecords(keyToLocationPairRDD, records);
+ boolean shouldUpdatePartitionPath =
config.getRecordIndexUpdatePartitionPath() && hoodieTable.isPartitioned();
+ HoodieData<HoodieRecord<R>> taggedRecords =
tagGlobalLocationBackToRecords(records, keyAndExistingLocations,
+ false, shouldUpdatePartitionPath, config, hoodieTable);
// The number of partitions in the taggedRecords is expected to the
maximum of the partitions in
// keyToLocationPairRDD and records RDD.
@@ -153,41 +151,6 @@ public class SparkMetadataTableRecordIndex extends
HoodieIndex<Object, Object> {
return false;
}
- private <R> HoodieData<HoodieRecord<R>> tagLocationBackToRecords(
- HoodiePairData<String, HoodieRecordGlobalLocation> keyFilenamePair,
- HoodieData<HoodieRecord<R>> records) {
- HoodiePairData<String, HoodieRecord<R>> keyRecordPairs =
- records.mapToPair(record -> ImmutablePair.of(record.getRecordKey(),
record));
- // Here as the records might have more data than keyFilenamePairs (some
row keys' not found in record index),
- // we will do left outer join.
- return keyRecordPairs.leftOuterJoin(keyFilenamePair).values()
- .map(v -> {
- HoodieRecord<R> record = v.getLeft();
- Option<HoodieRecordGlobalLocation> location =
Option.ofNullable(v.getRight().orElse(null));
- if (!location.isPresent()) {
- // No location found.
- return record;
- }
- // Ensure the partitionPath is also set correctly in the key
- if
(!record.getPartitionPath().equals(location.get().getPartitionPath())) {
- record = createNewHoodieRecord(record, location.get());
- }
-
- // Perform the tagging. Not using HoodieIndexUtils.getTaggedRecord
to prevent an additional copy which is not necessary for this index.
- record.unseal();
- record.setCurrentLocation(location.get());
- record.seal();
- return record;
- });
- }
-
- private HoodieRecord createNewHoodieRecord(HoodieRecord oldRecord,
HoodieRecordGlobalLocation location) {
- HoodieKey recordKey = new HoodieKey(oldRecord.getRecordKey(),
location.getPartitionPath());
- return config.getRecordMerger().getRecordType() ==
HoodieRecord.HoodieRecordType.AVRO
- ? new HoodieAvroRecord(recordKey, (HoodieRecordPayload)
oldRecord.getData())
- : ((HoodieSparkRecord) oldRecord).newInstance();
- }
-
/**
* Function that lookups a list of keys in a single shard of the record index
*/
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
index aa96020fa93..77a06f8a359 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
@@ -228,7 +228,7 @@ public class TestHoodieGlobalBloomIndex extends
TestHoodieMetadataBase {
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath)
.withIndexConfig(HoodieIndexConfig.newBuilder()
.withIndexType(HoodieIndex.IndexType.GLOBAL_BLOOM)
- .withBloomIndexUpdatePartitionPath(false)
+ .withGlobalBloomIndexUpdatePartitionPath(false)
.build())
.build();
HoodieGlobalBloomIndex index = new HoodieGlobalBloomIndex(config,
SparkHoodieBloomIndexHelper.getInstance());
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java
index 3f49e6757fd..677e478ffb8 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java
@@ -28,7 +28,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.spark.sql.Dataset;
@@ -53,6 +53,7 @@ import static
org.apache.hudi.common.testutils.HoodieAdaptablePayloadDataGenerat
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.getCommitTimeAtUTC;
import static org.apache.hudi.index.HoodieIndex.IndexType.GLOBAL_BLOOM;
import static org.apache.hudi.index.HoodieIndex.IndexType.GLOBAL_SIMPLE;
+import static org.apache.hudi.index.HoodieIndex.IndexType.RECORD_INDEX;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -62,14 +63,16 @@ public class TestGlobalIndexEnableUpdatePartitions extends
SparkClientFunctional
return Stream.of(
Arguments.of(COPY_ON_WRITE, GLOBAL_SIMPLE),
Arguments.of(COPY_ON_WRITE, GLOBAL_BLOOM),
+ Arguments.of(COPY_ON_WRITE, RECORD_INDEX),
Arguments.of(MERGE_ON_READ, GLOBAL_SIMPLE),
- Arguments.of(MERGE_ON_READ, GLOBAL_BLOOM)
+ Arguments.of(MERGE_ON_READ, GLOBAL_BLOOM),
+ Arguments.of(MERGE_ON_READ, RECORD_INDEX)
);
}
@ParameterizedTest
@MethodSource("getTableTypeAndIndexType")
- public void testPartitionChanges(HoodieTableType tableType,
HoodieIndex.IndexType indexType) throws IOException {
+ public void testPartitionChanges(HoodieTableType tableType, IndexType
indexType) throws IOException {
final Class<?> payloadClass = DefaultHoodieRecordPayload.class;
HoodieWriteConfig writeConfig = getWriteConfig(payloadClass, indexType);
HoodieTableMetaClient metaClient = getHoodieMetaClient(tableType,
writeConfig.getProps());
@@ -127,7 +130,7 @@ public class TestGlobalIndexEnableUpdatePartitions extends
SparkClientFunctional
@ParameterizedTest
@MethodSource("getTableTypeAndIndexType")
- public void testUpdatePartitionsThenDelete(HoodieTableType tableType,
HoodieIndex.IndexType indexType) throws IOException {
+ public void testUpdatePartitionsThenDelete(HoodieTableType tableType,
IndexType indexType) throws IOException {
final Class<?> payloadClass = DefaultHoodieRecordPayload.class;
HoodieWriteConfig writeConfig = getWriteConfig(payloadClass, indexType);
HoodieTableMetaClient metaClient = getHoodieMetaClient(tableType,
writeConfig.getProps());
@@ -202,21 +205,28 @@ public class TestGlobalIndexEnableUpdatePartitions
extends SparkClientFunctional
df.unpersist();
}
- private HoodieWriteConfig getWriteConfig(Class<?> payloadClass,
HoodieIndex.IndexType indexType) {
+ private HoodieWriteConfig getWriteConfig(Class<?> payloadClass, IndexType
indexType) {
+ HoodieMetadataConfig.Builder metadataConfigBuilder =
HoodieMetadataConfig.newBuilder();
+ if (indexType == IndexType.RECORD_INDEX) {
+ metadataConfigBuilder.enable(true).withEnableRecordIndex(true);
+ } else {
+ metadataConfigBuilder.enable(false);
+ }
return getConfigBuilder(true)
.withProperties(getKeyGenProps(payloadClass))
.withParallelism(2, 2)
.withBulkInsertParallelism(2)
.withDeleteParallelism(2)
-
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+ .withMetadataConfig(metadataConfigBuilder.build())
.withIndexConfig(HoodieIndexConfig.newBuilder()
.withIndexType(indexType)
.bloomIndexParallelism(2)
.withSimpleIndexParallelism(2)
.withGlobalSimpleIndexParallelism(2)
.withGlobalIndexReconcileParallelism(2)
- .withBloomIndexUpdatePartitionPath(true)
- .withGlobalSimpleIndexUpdatePartitionPath(true).build())
+ .withGlobalBloomIndexUpdatePartitionPath(true)
+ .withGlobalSimpleIndexUpdatePartitionPath(true)
+ .withRecordIndexUpdatePartitionPath(true).build())
.withSchema(SCHEMA_STR)
.withPayloadConfig(HoodiePayloadConfig.newBuilder()
.fromProperties(getPayloadProps(payloadClass)).build())