nsivabalan commented on code in PR #8344:
URL: https://github.com/apache/hudi/pull/8344#discussion_r1156783371


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -168,4 +171,36 @@ public static List<String> filterKeysFromFile(Path 
filePath, List<String> candid
     }
     return foundRecordKeys;
   }
+
+  public static <R> HoodieData<HoodieRecord<R>> 
dedupForPartitionUpdates(HoodieData<Pair<HoodieRecord<R>, Boolean>> 
taggedHoodieRecords, int dedupParallelism) {
+    /*
+     * In case a record is updated from p1 to p2 and then to p3, 2 existing 
records
+     * will be tagged for the incoming record to insert to p3. So we dedup 
them here. (Set A)
+     */
+    HoodiePairData<String, HoodieRecord<R>> deduped = 
taggedHoodieRecords.filter(Pair::getRight)
+        .map(Pair::getLeft)
+        .distinctWithKey(HoodieRecord::getKey, dedupParallelism)
+        .mapToPair(r -> Pair.of(r.getRecordKey(), r));
+
+    /*
+     * This includes
+     *  - tagged existing records whose partition paths are not to be updated 
(Set B)
+     *  - completely new records (Set C)
+     */
+    HoodieData<HoodieRecord<R>> undeduped = taggedHoodieRecords.filter(p -> 
!p.getRight()).map(Pair::getLeft);
+
+    /*
+     * There can be intersection between Set A and Set B mentioned above.
+     *
+     * Example: record X is updated from p1 to p2 and then back to p1.
+     * Set A will contain an insert to p1 and Set B will contain an update to 
p1.
+     *
+     * So we let A left-anti join B to drop the insert from Set A and keep the 
update in Set B.
+     */
+    return deduped.leftOuterJoin(undeduped
+            .filter(r -> !(r.getData() instanceof EmptyHoodieRecordPayload))

Review Comment:
   synced up directly. lets add java docs to call this out, ie. why we should 
strictly favor update record and not insert. so that anyone looking to make any 
changes in this code block is aware of all the nuances.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to