nsivabalan commented on code in PR #14090:
URL: https://github.com/apache/hudi/pull/14090#discussion_r2452175074
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1089,25 +1089,40 @@ public static Pair<Set<String>, Set<String>>
computeRevivedAndDeletedKeys(Set<St
}
/**
- * There are chances that same record key from data table has 2 entries (1
delete from older partition and 1 insert to newer partition)
- * So, this method performs reduce by key to ignore the deleted entry.
- * @param recordIndexRecords hoodie records after rli index lookup.
- * @param parallelism parallelism to use.
- * @return
+ * Reduces metadata table index records by their metadata record key to
ensure only a single
+ * update per metadata record key.
+ * <p>
+ * This is critical for handling scenarios where the same metadata record
key appears multiple times:
+ * - Partition movements: A data record moving from one partition to another
generates both
+ * a delete entry (from old partition) and an insert entry (to new partition)
+ * - File group movements: A record moving from one file group to another
+ * - When usePartitionInKey is true: Groups by (metadata record key,
partition path) tuple
+ * - When usePartitionInKey is false: Groups by metadata record key only
+ * <p>
+ * The reduce logic prefers non-deleted records over deleted ones to
properly handle movements.
+ * For Record Level Index: When both records are non-deleted (indicating
duplicate inserts for
+ * the same key), an exception is thrown since this represents an invalid
state.
+ * For Secondary Index: When both records are non-deleted, the later record
(record2) is preferred.
+ *
+ * @param metadataRecords {@link HoodieData} of metadata table index
records to be reduced by keys
+ * @param parallelism parallelism for the reduce-by operation
+ * @param usePartitionInKey whether to use partition path as part of the
grouping key
+ * (true for partitioned RLI, false for
non-partitioned RLI and SI)
+ * @return HoodieData of deduplicated metadata records with one record per
metadata record key
*/
@VisibleForTesting
- public static HoodieData<HoodieRecord> reduceByKeys(HoodieData<HoodieRecord>
recordIndexRecords, int parallelism, boolean isPartitionedRLI) {
- HoodiePairData<HoodieKey, HoodieRecord> recordIndexRecordsPair;
- if (isPartitionedRLI) {
- recordIndexRecordsPair = recordIndexRecords.mapToPair(r -> {
+ public static HoodieData<HoodieRecord> reduceByKeys(HoodieData<HoodieRecord>
metadataRecords, int parallelism, boolean usePartitionInKey) {
+ HoodiePairData<HoodieKey, HoodieRecord> metadataRecordsPair;
+ if (usePartitionInKey) {
+ metadataRecordsPair = metadataRecords.mapToPair(r -> {
String recordPartitionPath = r.isDelete(DELETE_CONTEXT,
CollectionUtils.emptyProps())
? ((EmptyHoodieRecordPayloadWithPartition)
r.getData()).getPartitionPath() : ((HoodieMetadataPayload)
r.getData()).getDataPartition();
return Pair.of(new HoodieKey(r.getRecordKey(), recordPartitionPath),
r);
});
} else {
- recordIndexRecordsPair = recordIndexRecords.mapToPair(r ->
Pair.of(r.getKey(), r));
+ metadataRecordsPair = metadataRecords.mapToPair(r -> Pair.of(r.getKey(),
r));
}
- return
recordIndexRecordsPair.reduceByKey((SerializableBiFunction<HoodieRecord,
HoodieRecord, HoodieRecord>) (record1, record2) -> {
+ return
metadataRecordsPair.reduceByKey((SerializableBiFunction<HoodieRecord,
HoodieRecord, HoodieRecord>) (record1, record2) -> {
Review Comment:
yes, if we have delete and update on the same key, we could remove it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]