lokeshj1703 commented on code in PR #13600:
URL: https://github.com/apache/hudi/pull/13600#discussion_r2252534516


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java:
##########
@@ -85,9 +102,70 @@ public I combineOnCondition(
    * @return Collection of HoodieRecord already be deduplicated
    */
   public I deduplicateRecords(I records, HoodieTable<T, I, K, O> table, int 
parallelism) {
+    HoodieReaderContext<T> readerContext =
+        (HoodieReaderContext<T>) 
table.getContext().<T>getReaderContextFactoryForWrite(table.getMetaClient(), 
table.getConfig().getRecordMerger().getRecordType(), 
table.getConfig().getProps())
+            .getContext();
+    HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
+    readerContext.initRecordMerger(table.getConfig().getProps());
+    List<String> orderingFieldNames = 
HoodieRecordUtils.getOrderingFieldNames(readerContext.getMergeMode(), 
table.getConfig().getProps(), table.getMetaClient());
     HoodieRecordMerger recordMerger = 
HoodieRecordUtils.mergerToPreCombineMode(table.getConfig().getRecordMerger());
-    return deduplicateRecords(records, table.getIndex(), parallelism, 
table.getConfig().getSchema(), table.getConfig().getProps(), recordMerger);
+    RecordMergeMode recordMergeMode = 
HoodieTableConfig.inferCorrectMergingBehavior(null, 
table.getConfig().getPayloadClass(), null,
+        String.join(",", orderingFieldNames), 
tableConfig.getTableVersion()).getLeft();
+    Schema recordSchema;
+    if (StringUtils.nonEmpty(table.getConfig().getPartialUpdateSchema())) {
+      recordSchema = new 
Schema.Parser().parse(table.getConfig().getPartialUpdateSchema());

Review Comment:
   Addressed



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java:
##########
@@ -85,9 +102,70 @@ public I combineOnCondition(
    * @return Collection of HoodieRecord already be deduplicated
    */
   public I deduplicateRecords(I records, HoodieTable<T, I, K, O> table, int 
parallelism) {
+    HoodieReaderContext<T> readerContext =
+        (HoodieReaderContext<T>) 
table.getContext().<T>getReaderContextFactoryForWrite(table.getMetaClient(), 
table.getConfig().getRecordMerger().getRecordType(), 
table.getConfig().getProps())
+            .getContext();
+    HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
+    readerContext.initRecordMerger(table.getConfig().getProps());
+    List<String> orderingFieldNames = 
HoodieRecordUtils.getOrderingFieldNames(readerContext.getMergeMode(), 
table.getConfig().getProps(), table.getMetaClient());
     HoodieRecordMerger recordMerger = 
HoodieRecordUtils.mergerToPreCombineMode(table.getConfig().getRecordMerger());
-    return deduplicateRecords(records, table.getIndex(), parallelism, 
table.getConfig().getSchema(), table.getConfig().getProps(), recordMerger);
+    RecordMergeMode recordMergeMode = 
HoodieTableConfig.inferCorrectMergingBehavior(null, 
table.getConfig().getPayloadClass(), null,
+        String.join(",", orderingFieldNames), 
tableConfig.getTableVersion()).getLeft();
+    Schema recordSchema;
+    if (StringUtils.nonEmpty(table.getConfig().getPartialUpdateSchema())) {
+      recordSchema = new 
Schema.Parser().parse(table.getConfig().getPartialUpdateSchema());
+    } else {
+      recordSchema = new 
Schema.Parser().parse(table.getConfig().getWriteSchema());
+    }
+    BufferedRecordMerger<T> bufferedRecordMerger = 
BufferedRecordMergerFactory.create(
+        readerContext,
+        recordMergeMode,
+        false,
+        Option.ofNullable(recordMerger),
+        orderingFieldNames,
+        Option.ofNullable(table.getConfig().getPayloadClass()),
+        recordSchema,
+        table.getConfig().getProps(),
+        tableConfig.getPartialUpdateMode());
+    return deduplicateRecords(
+        records,
+        table.getIndex(),
+        parallelism,
+        table.getConfig().getSchema(),
+        table.getConfig().getProps(),
+        bufferedRecordMerger,
+        readerContext,
+        orderingFieldNames.toArray(new String[0]));
   }
 
-  public abstract I deduplicateRecords(I records, HoodieIndex<?, ?> index, int 
parallelism, String schema, TypedProperties props, HoodieRecordMerger merger);
+  public abstract I deduplicateRecords(I records,
+                                       HoodieIndex<?, ?> index,
+                                       int parallelism,
+                                       String schema,
+                                       TypedProperties props,
+                                       BufferedRecordMerger<T> merger,
+                                       HoodieReaderContext<T> readerContext,
+                                       String[] orderingFieldNames);
+
+  protected static <T> HoodieRecord<T> reduceRecords(TypedProperties props, 
BufferedRecordMerger<T> recordMerger, String[] orderingFieldNames,
+                                                     HoodieRecord<T> previous, 
HoodieRecord<T> next, Schema schema, RecordContext<T> recordContext) {
+    try {
+      // NOTE: The order of previous and next is uncertain within a batch in 
"reduceByKey".
+      // If the return value is empty, it means the previous should be chosen.
+      BufferedRecord<T> newBufferedRecord = 
BufferedRecord.forRecordWithContext(next, schema, recordContext, props, 
orderingFieldNames);
+      // Construct old buffered record.
+      BufferedRecord<T> oldBufferedRecord = 
BufferedRecord.forRecordWithContext(previous, schema, recordContext, props, 
orderingFieldNames);
+      // Run merge.
+      Option<BufferedRecord<T>> merged = 
recordMerger.deltaMerge(newBufferedRecord, oldBufferedRecord);
+      // NOTE: For merge mode based merging, it returns non-null.
+      //       For mergers / payloads based merging, it may return null.
+      HoodieRecord<T> reducedRecord = 
merged.map(recordContext::constructHoodieRecord).orElse(previous);
+      boolean choosePrevious = merged.isEmpty();
+      HoodieKey reducedKey = choosePrevious ? previous.getKey() : 
next.getKey();
+      HoodieOperation operation = choosePrevious ? previous.getOperation() : 
next.getOperation();
+      return reducedRecord.newInstance(reducedKey, operation);

Review Comment:
   Addressed



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java:
##########
@@ -226,7 +233,18 @@ private void initRecordConverter() {
   }
 
   private void initMergeClass() {
-    recordMerger = 
HoodieRecordUtils.mergerToPreCombineMode(writeClient.getConfig().getRecordMerger());
+    readerContext = 
writeClient.getEngineContext().<RowData>getReaderContextFactory(metaClient).getContext();
+    orderingFieldNames = getOrderingFieldNames(readerContext.getMergeMode(), 
writeClient.getConfig().getProps(), metaClient);
+    recordMerger = BufferedRecordMergerFactory.create(
+        readerContext,
+        writeClient.getConfig().getRecordMergeMode(),
+        false,
+        Option.ofNullable(writeClient.getConfig().getRecordMerger()),
+        orderingFieldNames,
+        Option.ofNullable(writeClient.getConfig().getPayloadClass()),
+        new SerializableSchema(writeClient.getConfig().getSchema()).get(),

Review Comment:
   Addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to