danny0405 commented on code in PR #13600:
URL: https://github.com/apache/hudi/pull/13600#discussion_r2252944027


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java:
##########
@@ -85,9 +102,70 @@ public I combineOnCondition(
    * @return Collection of HoodieRecord already be deduplicated
    */
   public I deduplicateRecords(I records, HoodieTable<T, I, K, O> table, int 
parallelism) {
+    HoodieReaderContext<T> readerContext =
+        (HoodieReaderContext<T>) 
table.getContext().<T>getReaderContextFactoryForWrite(table.getMetaClient(), 
table.getConfig().getRecordMerger().getRecordType(), 
table.getConfig().getProps())
+            .getContext();
+    HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
+    readerContext.initRecordMerger(table.getConfig().getProps());
+    List<String> orderingFieldNames = 
HoodieRecordUtils.getOrderingFieldNames(readerContext.getMergeMode(), 
table.getConfig().getProps(), table.getMetaClient());
     HoodieRecordMerger recordMerger = 
HoodieRecordUtils.mergerToPreCombineMode(table.getConfig().getRecordMerger());
-    return deduplicateRecords(records, table.getIndex(), parallelism, 
table.getConfig().getSchema(), table.getConfig().getProps(), recordMerger);
+    RecordMergeMode recordMergeMode = 
HoodieTableConfig.inferCorrectMergingBehavior(null, 
table.getConfig().getPayloadClass(), null,
+        String.join(",", orderingFieldNames), 
tableConfig.getTableVersion()).getLeft();
+    Schema recordSchema;
+    if (StringUtils.nonEmpty(table.getConfig().getPartialUpdateSchema())) {
+      recordSchema = new 
Schema.Parser().parse(table.getConfig().getPartialUpdateSchema());
+    } else {
+      recordSchema = new 
Schema.Parser().parse(table.getConfig().getWriteSchema());
+    }
+    BufferedRecordMerger<T> bufferedRecordMerger = 
BufferedRecordMergerFactory.create(
+        readerContext,
+        recordMergeMode,
+        false,
+        Option.ofNullable(recordMerger),
+        orderingFieldNames,
+        Option.ofNullable(table.getConfig().getPayloadClass()),
+        recordSchema,
+        table.getConfig().getProps(),
+        tableConfig.getPartialUpdateMode());
+    return deduplicateRecords(
+        records,
+        table.getIndex(),
+        parallelism,
+        table.getConfig().getSchema(),
+        table.getConfig().getProps(),
+        bufferedRecordMerger,
+        readerContext,
+        orderingFieldNames.toArray(new String[0]));
   }
 
-  public abstract I deduplicateRecords(I records, HoodieIndex<?, ?> index, int 
parallelism, String schema, TypedProperties props, HoodieRecordMerger merger);
+  public abstract I deduplicateRecords(I records,
+                                       HoodieIndex<?, ?> index,
+                                       int parallelism,
+                                       String schema,
+                                       TypedProperties props,
+                                       BufferedRecordMerger<T> merger,
+                                       HoodieReaderContext<T> readerContext,
+                                       String[] orderingFieldNames);
+
+  protected static <T> HoodieRecord<T> reduceRecords(TypedProperties props, 
BufferedRecordMerger<T> recordMerger, String[] orderingFieldNames,
+                                                     HoodieRecord<T> previous, 
HoodieRecord<T> next, Schema schema, RecordContext<T> recordContext) {
+    try {
+      // NOTE: The order of previous and next is uncertain within a batch in 
"reduceByKey".
+      // If the return value is empty, it means the previous should be chosen.
+      BufferedRecord<T> newBufferedRecord = 
BufferedRecord.forRecordWithContext(next, schema, recordContext, props, 
orderingFieldNames);
+      // Construct old buffered record.
+      BufferedRecord<T> oldBufferedRecord = 
BufferedRecord.forRecordWithContext(previous, schema, recordContext, props, 
orderingFieldNames);
+      // Run merge.
+      Option<BufferedRecord<T>> merged = 
recordMerger.deltaMerge(newBufferedRecord, oldBufferedRecord);
+      // NOTE: For merge mode based merging, it returns non-null.
+      //       For mergers / payloads based merging, it may return null.
+      HoodieRecord<T> reducedRecord = 
merged.map(recordContext::constructHoodieRecord).orElse(previous);
+      boolean choosePrevious = merged.isEmpty();
+      HoodieKey reducedKey = choosePrevious ? previous.getKey() : 
next.getKey();
+      HoodieOperation operation = choosePrevious ? previous.getOperation() : 
next.getOperation();
+      return reducedRecord.newInstance(reducedKey, operation);

Review Comment:
   I checked the `HoodieWriteMergeHandle` that there is no usages of locations, 
so we should be safe, the location is used in append handle but there should be 
no deduplication in for this scenario.
   
   So I revert the location setup, sorry for the mistake.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to