yihua commented on code in PR #14090:
URL: https://github.com/apache/hudi/pull/14090#discussion_r2443905475


##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1089,25 +1089,40 @@ public static Pair<Set<String>, Set<String>> 
computeRevivedAndDeletedKeys(Set<St
   }
 
   /**
-   * There are chances that same record key from data table has 2 entries (1 
delete from older partition and 1 insert to newer partition)
-   * So, this method performs reduce by key to ignore the deleted entry.
-   * @param recordIndexRecords hoodie records after rli index lookup.
-   * @param parallelism parallelism to use.
-   * @return
+   * Reduces metadata table index records by their metadata record key to 
ensure only a single
+   * update per metadata record key.
+   * <p>
+   * This is critical for handling scenarios where the same metadata record 
key appears multiple times:
+   * - Partition movements: A data record moving from one partition to another 
generates both
+   * a delete entry (from old partition) and an insert entry (to new partition)
+   * - File group movements: A record moving from one file group to another
+   * - When usePartitionInKey is true: Groups by (metadata record key, 
partition path) tuple
+   * - When usePartitionInKey is false: Groups by metadata record key only
+   * <p>
+   * The reduce logic prefers non-deleted records over deleted ones to 
properly handle movements.
+   * For Record Level Index: When both records are non-deleted (indicating 
duplicate inserts for
+   * the same key), an exception is thrown since this represents an invalid 
state.
+   * For Secondary Index: When both records are non-deleted, the later record 
(record2) is preferred.
+   *
+   * @param metadataRecords   {@link HoodieData} of metadata table index 
records to be reduced by keys
+   * @param parallelism       parallelism for the reduce-by operation
+   * @param usePartitionInKey whether to use partition path as part of the 
grouping key
+   *                          (true for partitioned RLI, false for 
non-partitioned RLI and SI)
+   * @return HoodieData of deduplicated metadata records with one record per 
metadata record key
    */
   @VisibleForTesting
-  public static HoodieData<HoodieRecord> reduceByKeys(HoodieData<HoodieRecord> 
recordIndexRecords, int parallelism, boolean isPartitionedRLI) {
-    HoodiePairData<HoodieKey, HoodieRecord> recordIndexRecordsPair;
-    if (isPartitionedRLI) {
-      recordIndexRecordsPair = recordIndexRecords.mapToPair(r -> {
+  public static HoodieData<HoodieRecord> reduceByKeys(HoodieData<HoodieRecord> 
metadataRecords, int parallelism, boolean usePartitionInKey) {
+    HoodiePairData<HoodieKey, HoodieRecord> metadataRecordsPair;
+    if (usePartitionInKey) {
+      metadataRecordsPair = metadataRecords.mapToPair(r -> {
         String recordPartitionPath = r.isDelete(DELETE_CONTEXT, 
CollectionUtils.emptyProps())
             ? ((EmptyHoodieRecordPayloadWithPartition) 
r.getData()).getPartitionPath() : ((HoodieMetadataPayload) 
r.getData()).getDataPartition();
         return Pair.of(new HoodieKey(r.getRecordKey(), recordPartitionPath), 
r);
       });
     } else {
-      recordIndexRecordsPair = recordIndexRecords.mapToPair(r -> 
Pair.of(r.getKey(), r));
+      metadataRecordsPair = metadataRecords.mapToPair(r -> Pair.of(r.getKey(), 
r));
     }
-    return 
recordIndexRecordsPair.reduceByKey((SerializableBiFunction<HoodieRecord, 
HoodieRecord, HoodieRecord>) (record1, record2) -> {
+    return 
metadataRecordsPair.reduceByKey((SerializableBiFunction<HoodieRecord, 
HoodieRecord, HoodieRecord>) (record1, record2) -> {

Review Comment:
   @nsivabalan actually, for secondary index, if there are a delete and update 
on the same metadata record key (e.g., `secondary_key$record_key`), we can 
remove both records, correct? This is because there is no other information 
stored outside `secondary_key$record_key`, unlike RLI which stores the location 
in the record.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to