nsivabalan commented on code in PR #9041:
URL: https://github.com/apache/hudi/pull/9041#discussion_r1240924843
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -310,6 +312,56 @@ public static <R> HoodieData<HoodieRecord<R>>
mergeForPartitionUpdates(
return Arrays.asList(deleteRecord, getTaggedRecord(merged,
Option.empty())).iterator();
}
});
- return taggedUpdatingRecords.union(newRecords);
+ return taggedUpdatingRecords.union(taggedNewRecords);
+ }
+
+ public static <R> HoodieData<HoodieRecord<R>> tagGlobalLocationBackToRecords(
+ HoodieData<HoodieRecord<R>> incomingRecords,
+ HoodiePairData<String, HoodieRecordGlobalLocation>
keyAndExistingLocations,
+ boolean mayContainDuplicateLookup,
+ boolean shouldUpdatePartitionPath,
+ HoodieWriteConfig config,
+ HoodieTable table) {
+ final HoodieRecordMerger merger = config.getRecordMerger();
+
+ HoodiePairData<String, HoodieRecord<R>> keyAndIncomingRecords =
+ incomingRecords.mapToPair(record -> Pair.of(record.getRecordKey(),
record));
+
+ // Pair of incoming record and the global location if meant for merged
lookup in later stage
+ HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>>
incomingRecordsAndLocations
+ = keyAndIncomingRecords.leftOuterJoin(keyAndExistingLocations).values()
+ .map(v -> {
+ final HoodieRecord<R> incomingRecord = v.getLeft();
+ Option<HoodieRecordGlobalLocation> currentLocOpt =
Option.ofNullable(v.getRight().orElse(null));
+ if (currentLocOpt.isPresent()) {
+ HoodieRecordGlobalLocation currentLoc = currentLocOpt.get();
+ boolean shouldPerformMergedLookUp = mayContainDuplicateLookup
+ || !Objects.equals(incomingRecord.getPartitionPath(),
currentLoc.getPartitionPath());
+ if (shouldUpdatePartitionPath && shouldPerformMergedLookUp) {
+ return Pair.of(incomingRecord, currentLocOpt);
+ } else {
+ // - When update partition path is set to false,
+ // the incoming record will be tagged to the existing record's
partition regardless of being equal or not.
+ // - When update partition path is set to true,
+ // the incoming record will be tagged to the existing record's
partition
+ // when partition is not updated and the look-up won't have
duplicates (e.g. COW, or using RLI).
+ return Pair.of((HoodieRecord<R>) getTaggedRecord(
+ createNewHoodieRecord(incomingRecord, currentLoc,
merger), Option.of(currentLoc)),
+ Option.empty());
+ }
+ } else {
+ return Pair.of(getTaggedRecord(incomingRecord, Option.empty()),
Option.empty());
+ }
+ });
+ return shouldUpdatePartitionPath
+ ? mergeForPartitionUpdatesIfNeeded(incomingRecordsAndLocations,
config, table)
+ : incomingRecordsAndLocations.map(Pair::getLeft);
+ }
+
+ public static HoodieRecord createNewHoodieRecord(HoodieRecord oldRecord,
HoodieRecordGlobalLocation location, HoodieRecordMerger merger) {
+ HoodieKey recordKey = new HoodieKey(oldRecord.getRecordKey(),
location.getPartitionPath());
+ return merger.getRecordType() == HoodieRecordType.AVRO
Review Comment:
shouldn't we set the location in the hoodieRecord before returning ?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -310,6 +312,56 @@ public static <R> HoodieData<HoodieRecord<R>>
mergeForPartitionUpdates(
return Arrays.asList(deleteRecord, getTaggedRecord(merged,
Option.empty())).iterator();
}
});
- return taggedUpdatingRecords.union(newRecords);
+ return taggedUpdatingRecords.union(taggedNewRecords);
+ }
+
+ public static <R> HoodieData<HoodieRecord<R>> tagGlobalLocationBackToRecords(
+ HoodieData<HoodieRecord<R>> incomingRecords,
+ HoodiePairData<String, HoodieRecordGlobalLocation>
keyAndExistingLocations,
+ boolean mayContainDuplicateLookup,
+ boolean shouldUpdatePartitionPath,
+ HoodieWriteConfig config,
+ HoodieTable table) {
+ final HoodieRecordMerger merger = config.getRecordMerger();
+
+ HoodiePairData<String, HoodieRecord<R>> keyAndIncomingRecords =
+ incomingRecords.mapToPair(record -> Pair.of(record.getRecordKey(),
record));
+
+ // Pair of incoming record and the global location if meant for merged
lookup in later stage
+ HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>>
incomingRecordsAndLocations
+ = keyAndIncomingRecords.leftOuterJoin(keyAndExistingLocations).values()
+ .map(v -> {
+ final HoodieRecord<R> incomingRecord = v.getLeft();
+ Option<HoodieRecordGlobalLocation> currentLocOpt =
Option.ofNullable(v.getRight().orElse(null));
+ if (currentLocOpt.isPresent()) {
+ HoodieRecordGlobalLocation currentLoc = currentLocOpt.get();
+ boolean shouldPerformMergedLookUp = mayContainDuplicateLookup
+ || !Objects.equals(incomingRecord.getPartitionPath(),
currentLoc.getPartitionPath());
+ if (shouldUpdatePartitionPath && shouldPerformMergedLookUp) {
+ return Pair.of(incomingRecord, currentLocOpt);
+ } else {
+ // - When update partition path is set to false,
+ // the incoming record will be tagged to the existing record's
partition regardless of being equal or not.
+ // - When update partition path is set to true,
+ // the incoming record will be tagged to the existing record's
partition
+ // when partition is not updated and the look-up won't have
duplicates (e.g. COW, or using RLI).
+ return Pair.of((HoodieRecord<R>) getTaggedRecord(
+ createNewHoodieRecord(incomingRecord, currentLoc,
merger), Option.of(currentLoc)),
+ Option.empty());
+ }
+ } else {
+ return Pair.of(getTaggedRecord(incomingRecord, Option.empty()),
Option.empty());
Review Comment:
may be, we don't even need to call getTaggedRecord. just
Pair.of(incomingRecord, Option.empty() would do.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -310,6 +312,56 @@ public static <R> HoodieData<HoodieRecord<R>>
mergeForPartitionUpdates(
return Arrays.asList(deleteRecord, getTaggedRecord(merged,
Option.empty())).iterator();
}
});
- return taggedUpdatingRecords.union(newRecords);
+ return taggedUpdatingRecords.union(taggedNewRecords);
+ }
+
+ public static <R> HoodieData<HoodieRecord<R>> tagGlobalLocationBackToRecords(
+ HoodieData<HoodieRecord<R>> incomingRecords,
+ HoodiePairData<String, HoodieRecordGlobalLocation>
keyAndExistingLocations,
+ boolean mayContainDuplicateLookup,
+ boolean shouldUpdatePartitionPath,
+ HoodieWriteConfig config,
+ HoodieTable table) {
+ final HoodieRecordMerger merger = config.getRecordMerger();
+
+ HoodiePairData<String, HoodieRecord<R>> keyAndIncomingRecords =
+ incomingRecords.mapToPair(record -> Pair.of(record.getRecordKey(),
record));
+
+ // Pair of incoming record and the global location if meant for merged
lookup in later stage
+ HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>>
incomingRecordsAndLocations
+ = keyAndIncomingRecords.leftOuterJoin(keyAndExistingLocations).values()
+ .map(v -> {
+ final HoodieRecord<R> incomingRecord = v.getLeft();
+ Option<HoodieRecordGlobalLocation> currentLocOpt =
Option.ofNullable(v.getRight().orElse(null));
+ if (currentLocOpt.isPresent()) {
+ HoodieRecordGlobalLocation currentLoc = currentLocOpt.get();
+ boolean shouldPerformMergedLookUp = mayContainDuplicateLookup
+ || !Objects.equals(incomingRecord.getPartitionPath(),
currentLoc.getPartitionPath());
+ if (shouldUpdatePartitionPath && shouldPerformMergedLookUp) {
+ return Pair.of(incomingRecord, currentLocOpt);
+ } else {
+ // - When update partition path is set to false,
+ // the incoming record will be tagged to the existing record's
partition regardless of being equal or not.
+ // - When update partition path is set to true,
+ // the incoming record will be tagged to the existing record's
partition
+ // when partition is not updated and the look-up won't have
duplicates (e.g. COW, or using RLI).
+ return Pair.of((HoodieRecord<R>) getTaggedRecord(
+ createNewHoodieRecord(incomingRecord, currentLoc,
merger), Option.of(currentLoc)),
Review Comment:
minor. last arg -> currentLocOpt
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -310,6 +312,56 @@ public static <R> HoodieData<HoodieRecord<R>>
mergeForPartitionUpdates(
return Arrays.asList(deleteRecord, getTaggedRecord(merged,
Option.empty())).iterator();
}
});
- return taggedUpdatingRecords.union(newRecords);
+ return taggedUpdatingRecords.union(taggedNewRecords);
+ }
+
+ public static <R> HoodieData<HoodieRecord<R>> tagGlobalLocationBackToRecords(
+ HoodieData<HoodieRecord<R>> incomingRecords,
+ HoodiePairData<String, HoodieRecordGlobalLocation>
keyAndExistingLocations,
+ boolean mayContainDuplicateLookup,
+ boolean shouldUpdatePartitionPath,
+ HoodieWriteConfig config,
+ HoodieTable table) {
+ final HoodieRecordMerger merger = config.getRecordMerger();
+
+ HoodiePairData<String, HoodieRecord<R>> keyAndIncomingRecords =
+ incomingRecords.mapToPair(record -> Pair.of(record.getRecordKey(),
record));
+
+ // Pair of incoming record and the global location if meant for merged
lookup in later stage
+ HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>>
incomingRecordsAndLocations
+ = keyAndIncomingRecords.leftOuterJoin(keyAndExistingLocations).values()
+ .map(v -> {
+ final HoodieRecord<R> incomingRecord = v.getLeft();
+ Option<HoodieRecordGlobalLocation> currentLocOpt =
Option.ofNullable(v.getRight().orElse(null));
+ if (currentLocOpt.isPresent()) {
+ HoodieRecordGlobalLocation currentLoc = currentLocOpt.get();
+ boolean shouldPerformMergedLookUp = mayContainDuplicateLookup
+ || !Objects.equals(incomingRecord.getPartitionPath(),
currentLoc.getPartitionPath());
+ if (shouldUpdatePartitionPath && shouldPerformMergedLookUp) {
+ return Pair.of(incomingRecord, currentLocOpt);
+ } else {
+ // - When update partition path is set to false,
+ // the incoming record will be tagged to the existing record's
partition regardless of being equal or not.
+ // - When update partition path is set to true,
+ // the incoming record will be tagged to the existing record's
partition
+ // when partition is not updated and the look-up won't have
duplicates (e.g. COW, or using RLI).
+ return Pair.of((HoodieRecord<R>) getTaggedRecord(
Review Comment:
should this be
```
createNewHoodieRecord(incomingRecord, currentLoc, merger), Option.empty())
```
i.e. last arg as Option.empty. so that within
mergeForPartitionUpdatesIfNeeded, we don't need to load those partition keys.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -310,6 +312,56 @@ public static <R> HoodieData<HoodieRecord<R>>
mergeForPartitionUpdates(
return Arrays.asList(deleteRecord, getTaggedRecord(merged,
Option.empty())).iterator();
}
});
- return taggedUpdatingRecords.union(newRecords);
+ return taggedUpdatingRecords.union(taggedNewRecords);
+ }
+
+ public static <R> HoodieData<HoodieRecord<R>> tagGlobalLocationBackToRecords(
+ HoodieData<HoodieRecord<R>> incomingRecords,
+ HoodiePairData<String, HoodieRecordGlobalLocation>
keyAndExistingLocations,
+ boolean mayContainDuplicateLookup,
+ boolean shouldUpdatePartitionPath,
+ HoodieWriteConfig config,
+ HoodieTable table) {
+ final HoodieRecordMerger merger = config.getRecordMerger();
+
+ HoodiePairData<String, HoodieRecord<R>> keyAndIncomingRecords =
+ incomingRecords.mapToPair(record -> Pair.of(record.getRecordKey(),
record));
+
+ // Pair of incoming record and the global location if meant for merged
lookup in later stage
+ HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>>
incomingRecordsAndLocations
+ = keyAndIncomingRecords.leftOuterJoin(keyAndExistingLocations).values()
+ .map(v -> {
+ final HoodieRecord<R> incomingRecord = v.getLeft();
+ Option<HoodieRecordGlobalLocation> currentLocOpt =
Option.ofNullable(v.getRight().orElse(null));
+ if (currentLocOpt.isPresent()) {
+ HoodieRecordGlobalLocation currentLoc = currentLocOpt.get();
+ boolean shouldPerformMergedLookUp = mayContainDuplicateLookup
+ || !Objects.equals(incomingRecord.getPartitionPath(),
currentLoc.getPartitionPath());
Review Comment:
would also work for this scenario.
a record moved from P1 -> p2 and then now we getting a new batch where it
moves again to p1?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -310,6 +312,56 @@ public static <R> HoodieData<HoodieRecord<R>>
mergeForPartitionUpdates(
return Arrays.asList(deleteRecord, getTaggedRecord(merged,
Option.empty())).iterator();
}
});
- return taggedUpdatingRecords.union(newRecords);
+ return taggedUpdatingRecords.union(taggedNewRecords);
+ }
+
+ public static <R> HoodieData<HoodieRecord<R>> tagGlobalLocationBackToRecords(
+ HoodieData<HoodieRecord<R>> incomingRecords,
+ HoodiePairData<String, HoodieRecordGlobalLocation>
keyAndExistingLocations,
+ boolean mayContainDuplicateLookup,
+ boolean shouldUpdatePartitionPath,
+ HoodieWriteConfig config,
+ HoodieTable table) {
+ final HoodieRecordMerger merger = config.getRecordMerger();
+
+ HoodiePairData<String, HoodieRecord<R>> keyAndIncomingRecords =
+ incomingRecords.mapToPair(record -> Pair.of(record.getRecordKey(),
record));
+
+ // Pair of incoming record and the global location if meant for merged
lookup in later stage
+ HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>>
incomingRecordsAndLocations
+ = keyAndIncomingRecords.leftOuterJoin(keyAndExistingLocations).values()
+ .map(v -> {
+ final HoodieRecord<R> incomingRecord = v.getLeft();
+ Option<HoodieRecordGlobalLocation> currentLocOpt =
Option.ofNullable(v.getRight().orElse(null));
+ if (currentLocOpt.isPresent()) {
+ HoodieRecordGlobalLocation currentLoc = currentLocOpt.get();
+ boolean shouldPerformMergedLookUp = mayContainDuplicateLookup
+ || !Objects.equals(incomingRecord.getPartitionPath(),
currentLoc.getPartitionPath());
+ if (shouldUpdatePartitionPath && shouldPerformMergedLookUp) {
+ return Pair.of(incomingRecord, currentLocOpt);
+ } else {
+ // - When update partition path is set to false,
+ // the incoming record will be tagged to the existing record's
partition regardless of being equal or not.
+ // - When update partition path is set to true,
+ // the incoming record will be tagged to the existing record's
partition
+ // when partition is not updated and the look-up won't have
duplicates (e.g. COW, or using RLI).
+ return Pair.of((HoodieRecord<R>) getTaggedRecord(
+ createNewHoodieRecord(incomingRecord, currentLoc,
merger), Option.of(currentLoc)),
+ Option.empty());
+ }
+ } else {
+ return Pair.of(getTaggedRecord(incomingRecord, Option.empty()),
Option.empty());
+ }
+ });
+ return shouldUpdatePartitionPath
+ ? mergeForPartitionUpdatesIfNeeded(incomingRecordsAndLocations,
config, table)
Review Comment:
looks like we call mergeForPartitionUpdatesIfNeeded just based on
shouldUpdatePartitionPath.
in case of RLI, even if shouldUpdatePartitionPath is set to true, and if
incoming record's location is not differing from existing location, we should
avoid snapshot load right.
can you help me understand if we are good on this case.
within mergeForPartitionUpdatesIfNeeded, I see that we call
getExistingRecords() irrespective of these.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java:
##########
@@ -72,85 +68,37 @@ public <R> HoodieData<HoodieRecord<R>> tagLocation(
protected <R> HoodieData<HoodieRecord<R>> tagLocationInternal(
HoodieData<HoodieRecord<R>> inputRecords, HoodieEngineContext context,
HoodieTable hoodieTable) {
-
- HoodiePairData<String, HoodieRecord<R>> keyedInputRecords =
- inputRecords.mapToPair(entry -> new
ImmutablePair<>(entry.getRecordKey(), entry));
- HoodiePairData<HoodieKey, HoodieRecordLocation> allRecordLocationsInTable =
- fetchAllRecordLocations(context, hoodieTable,
config.getGlobalSimpleIndexParallelism());
- return getTaggedRecords(keyedInputRecords, allRecordLocationsInTable,
hoodieTable);
+ List<Pair<String, HoodieBaseFile>> latestBaseFiles =
getAllBaseFilesInTable(context, hoodieTable);
Review Comment:
we could move L 71 within fetchRecordGlobalLocations() right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]