xushiyan commented on code in PR #8490:
URL: https://github.com/apache/hudi/pull/8490#discussion_r1183253432
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -175,4 +186,72 @@ public static boolean checkIfValidCommit(HoodieTimeline
commitTimeline, String c
// 2) is less than the first commit ts in the timeline
return !commitTimeline.empty() &&
commitTimeline.containsOrBeforeTimelineStarts(commitTs);
}
+
+ public static <R> HoodieData<HoodieRecord<R>>
getTaggedRecordsFromPartitionLocations(
+ HoodieData<Pair<String, HoodieRecordLocation>> partitionLocations,
HoodieWriteConfig config, HoodieTable hoodieTable) {
+ final Option<String> instantTime = hoodieTable
+ .getMetaClient()
+ .getCommitsTimeline()
+ .filterCompletedInstants()
+ .lastInstant()
+ .map(HoodieInstant::getTimestamp);
+ return partitionLocations.flatMap(p -> {
+ String partitionPath = p.getLeft();
+ String fileId = p.getRight().getFileId();
+ return new HoodieMergedReadHandle(config, instantTime, hoodieTable,
Pair.of(partitionPath, fileId))
+ .getMergedRecords().iterator();
+ });
+ }
+
+ public static <R> HoodieData<HoodieRecord<R>> mergeForPartitionUpdates(
+ HoodieData<Pair<HoodieRecord<R>, Option<Pair<String,
HoodieRecordLocation>>>> taggedHoodieRecords, HoodieWriteConfig config,
HoodieTable hoodieTable) {
+ // completely new records
+ HoodieData<HoodieRecord<R>> newRecords = taggedHoodieRecords.filter(p ->
!p.getRight().isPresent()).map(Pair::getLeft);
+ // the records tagged to existing base files
+ HoodieData<HoodieRecord<R>> updatingRecords = taggedHoodieRecords.filter(p
-> p.getRight().isPresent()).map(Pair::getLeft)
+ .distinctWithKey(HoodieRecord::getRecordKey,
config.getGlobalIndexReconcileParallelism());
Review Comment:
the tagged records at this point will contain dups in case of last write
updated partition and inserted a new record to new partition, and compaction
has not happened yet. The first look up will still get 2 records due to join
only with base files.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]