vinothchandar commented on code in PR #8344:
URL: https://github.com/apache/hudi/pull/8344#discussion_r1159331827
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java:
##########
@@ -244,6 +244,12 @@ public class HoodieIndexConfig extends HoodieConfig {
.defaultValue("true")
.withDocumentation("Similar to " +
BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE + ", but for simple index.");
+ public static final ConfigProperty<String> GLOBAL_INDEX_DEDUP_PARALLELISM =
ConfigProperty
Review Comment:
lets make sure this is tagged an adv config? or not exposed to user by
default. User should n't have to tune this.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java:
##########
@@ -244,6 +244,12 @@ public class HoodieIndexConfig extends HoodieConfig {
.defaultValue("true")
.withDocumentation("Similar to " +
BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE + ", but for simple index.");
+ public static final ConfigProperty<String> GLOBAL_INDEX_DEDUP_PARALLELISM =
ConfigProperty
Review Comment:
also calling this `deduping` overloads the meaning a bit. - we are not
removing the duplicates per see, right? We only ensure the tagging routes it to
the right record? `"hoodie.global.index.reconcile.parallelism"`
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -168,4 +171,36 @@ public static List<String> filterKeysFromFile(Path
filePath, List<String> candid
}
return foundRecordKeys;
}
+
+ public static <R> HoodieData<HoodieRecord<R>>
dedupForPartitionUpdates(HoodieData<Pair<HoodieRecord<R>, Boolean>>
taggedHoodieRecords, int dedupParallelism) {
+ /*
+ * In case a record is updated from p1 to p2 and then to p3, 2 existing
records
+ * will be tagged for the incoming record to insert to p3. So we dedup
them here. (Set A)
+ */
+ HoodiePairData<String, HoodieRecord<R>> deduped =
taggedHoodieRecords.filter(Pair::getRight)
+ .map(Pair::getLeft)
+ .distinctWithKey(HoodieRecord::getKey, dedupParallelism)
+ .mapToPair(r -> Pair.of(r.getRecordKey(), r));
+
+ /*
+ * This includes
+ * - tagged existing records whose partition paths are not to be updated
(Set B)
+ * - completely new records (Set C)
+ */
+ HoodieData<HoodieRecord<R>> undeduped = taggedHoodieRecords.filter(p ->
!p.getRight()).map(Pair::getLeft);
+
+ /*
+ * There can be intersection between Set A and Set B mentioned above.
+ *
+ * Example: record X is updated from p1 to p2 and then back to p1.
+ * Set A will contain an insert to p1 and Set B will contain an update to
p1.
+ *
+ * So we let A left-anti join B to drop the insert from Set A and keep the
update in Set B.
+ */
+ return deduped.leftOuterJoin(undeduped
+ .filter(r -> !(r.getData() instanceof EmptyHoodieRecordPayload))
Review Comment:
should we instead be applying the payload to the old and new record?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -168,4 +171,36 @@ public static List<String> filterKeysFromFile(Path
filePath, List<String> candid
}
return foundRecordKeys;
}
+
+ public static <R> HoodieData<HoodieRecord<R>>
dedupForPartitionUpdates(HoodieData<Pair<HoodieRecord<R>, Boolean>>
taggedHoodieRecords, int dedupParallelism) {
+ /*
+ * In case a record is updated from p1 to p2 and then to p3, 2 existing
records
+ * will be tagged for the incoming record to insert to p3. So we dedup
them here. (Set A)
+ */
+ HoodiePairData<String, HoodieRecord<R>> deduped =
taggedHoodieRecords.filter(Pair::getRight)
+ .map(Pair::getLeft)
+ .distinctWithKey(HoodieRecord::getKey, dedupParallelism)
+ .mapToPair(r -> Pair.of(r.getRecordKey(), r));
+
+ /*
+ * This includes
+ * - tagged existing records whose partition paths are not to be updated
(Set B)
+ * - completely new records (Set C)
+ */
+ HoodieData<HoodieRecord<R>> undeduped = taggedHoodieRecords.filter(p ->
!p.getRight()).map(Pair::getLeft);
+
+ /*
+ * There can be intersection between Set A and Set B mentioned above.
+ *
+ * Example: record X is updated from p1 to p2 and then back to p1.
+ * Set A will contain an insert to p1 and Set B will contain an update to
p1.
+ *
+ * So we let A left-anti join B to drop the insert from Set A and keep the
update in Set B.
+ */
+ return deduped.leftOuterJoin(undeduped
+ .filter(r -> !(r.getData() instanceof EmptyHoodieRecordPayload))
Review Comment:
which is kind of the semantics we should be going for?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]