vinothchandar commented on code in PR #8344:
URL: https://github.com/apache/hudi/pull/8344#discussion_r1159331827


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java:
##########
@@ -244,6 +244,12 @@ public class HoodieIndexConfig extends HoodieConfig {
       .defaultValue("true")
       .withDocumentation("Similar to " + 
BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE + ", but for simple index.");
 
+  public static final ConfigProperty<String> GLOBAL_INDEX_DEDUP_PARALLELISM = 
ConfigProperty

Review Comment:
   lets make sure this is tagged an adv config? or not exposed to user by 
default. User should n't have to tune this.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java:
##########
@@ -244,6 +244,12 @@ public class HoodieIndexConfig extends HoodieConfig {
       .defaultValue("true")
       .withDocumentation("Similar to " + 
BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE + ", but for simple index.");
 
+  public static final ConfigProperty<String> GLOBAL_INDEX_DEDUP_PARALLELISM = 
ConfigProperty

Review Comment:
   also calling this `deduping` overloads the meaning a bit. - we are not 
removing the duplicates per see, right? We only ensure the tagging routes it to 
the right record? `"hoodie.global.index.reconcile.parallelism"`



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -168,4 +171,36 @@ public static List<String> filterKeysFromFile(Path 
filePath, List<String> candid
     }
     return foundRecordKeys;
   }
+
+  public static <R> HoodieData<HoodieRecord<R>> 
dedupForPartitionUpdates(HoodieData<Pair<HoodieRecord<R>, Boolean>> 
taggedHoodieRecords, int dedupParallelism) {
+    /*
+     * In case a record is updated from p1 to p2 and then to p3, 2 existing 
records
+     * will be tagged for the incoming record to insert to p3. So we dedup 
them here. (Set A)
+     */
+    HoodiePairData<String, HoodieRecord<R>> deduped = 
taggedHoodieRecords.filter(Pair::getRight)
+        .map(Pair::getLeft)
+        .distinctWithKey(HoodieRecord::getKey, dedupParallelism)
+        .mapToPair(r -> Pair.of(r.getRecordKey(), r));
+
+    /*
+     * This includes
+     *  - tagged existing records whose partition paths are not to be updated 
(Set B)
+     *  - completely new records (Set C)
+     */
+    HoodieData<HoodieRecord<R>> undeduped = taggedHoodieRecords.filter(p -> 
!p.getRight()).map(Pair::getLeft);
+
+    /*
+     * There can be intersection between Set A and Set B mentioned above.
+     *
+     * Example: record X is updated from p1 to p2 and then back to p1.
+     * Set A will contain an insert to p1 and Set B will contain an update to 
p1.
+     *
+     * So we let A left-anti join B to drop the insert from Set A and keep the 
update in Set B.
+     */
+    return deduped.leftOuterJoin(undeduped
+            .filter(r -> !(r.getData() instanceof EmptyHoodieRecordPayload))

Review Comment:
   should we instead be applying the payload to the old and new record?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -168,4 +171,36 @@ public static List<String> filterKeysFromFile(Path 
filePath, List<String> candid
     }
     return foundRecordKeys;
   }
+
+  public static <R> HoodieData<HoodieRecord<R>> 
dedupForPartitionUpdates(HoodieData<Pair<HoodieRecord<R>, Boolean>> 
taggedHoodieRecords, int dedupParallelism) {
+    /*
+     * In case a record is updated from p1 to p2 and then to p3, 2 existing 
records
+     * will be tagged for the incoming record to insert to p3. So we dedup 
them here. (Set A)
+     */
+    HoodiePairData<String, HoodieRecord<R>> deduped = 
taggedHoodieRecords.filter(Pair::getRight)
+        .map(Pair::getLeft)
+        .distinctWithKey(HoodieRecord::getKey, dedupParallelism)
+        .mapToPair(r -> Pair.of(r.getRecordKey(), r));
+
+    /*
+     * This includes
+     *  - tagged existing records whose partition paths are not to be updated 
(Set B)
+     *  - completely new records (Set C)
+     */
+    HoodieData<HoodieRecord<R>> undeduped = taggedHoodieRecords.filter(p -> 
!p.getRight()).map(Pair::getLeft);
+
+    /*
+     * There can be intersection between Set A and Set B mentioned above.
+     *
+     * Example: record X is updated from p1 to p2 and then back to p1.
+     * Set A will contain an insert to p1 and Set B will contain an update to 
p1.
+     *
+     * So we let A left-anti join B to drop the insert from Set A and keep the 
update in Set B.
+     */
+    return deduped.leftOuterJoin(undeduped
+            .filter(r -> !(r.getData() instanceof EmptyHoodieRecordPayload))

Review Comment:
   which is kind of the semantics we should be going for?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to