nsivabalan commented on code in PR #8344:
URL: https://github.com/apache/hudi/pull/8344#discussion_r1156745651


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -168,4 +171,36 @@ public static List<String> filterKeysFromFile(Path 
filePath, List<String> candid
     }
     return foundRecordKeys;
   }
+
+  public static <R> HoodieData<HoodieRecord<R>> 
dedupForPartitionUpdates(HoodieData<Pair<HoodieRecord<R>, Boolean>> 
taggedHoodieRecords, int dedupParallelism) {
+    /*
+     * In case a record is updated from p1 to p2 and then to p3, 2 existing 
records
+     * will be tagged for the incoming record to insert to p3. So we dedup 
them here. (Set A)
+     */
+    HoodiePairData<String, HoodieRecord<R>> deduped = 
taggedHoodieRecords.filter(Pair::getRight)
+        .map(Pair::getLeft)
+        .distinctWithKey(HoodieRecord::getKey, dedupParallelism)
+        .mapToPair(r -> Pair.of(r.getRecordKey(), r));
+
+    /*
+     * This includes
+     *  - tagged existing records whose partition paths are not to be updated 
(Set B)
+     *  - completely new records (Set C)
+     */
+    HoodieData<HoodieRecord<R>> undeduped = taggedHoodieRecords.filter(p -> 
!p.getRight()).map(Pair::getLeft);
+
+    /*
+     * There can be intersection between Set A and Set B mentioned above.
+     *
+     * Example: record X is updated from p1 to p2 and then back to p1.
+     * Set A will contain an insert to p1 and Set B will contain an update to 
p1.
+     *
+     * So we let A left-anti join B to drop the insert from Set A and keep the 
update in Set B.
+     */
+    return deduped.leftOuterJoin(undeduped
+            .filter(r -> !(r.getData() instanceof EmptyHoodieRecordPayload))

Review Comment:
   does it matter if we favor insert or an update here? 
   If yes, I feel its better to favor insert and drop the update. so that we 
maintain the behavior across the board. i.e. whenever a record migrates from 
one partition to another, we will ignore whatever in storage and do an insert 
to incoming partition. to maintain similar semantics, thinking if we shd favor 
insert record over update. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to