CTTY commented on code in PR #7359:
URL: https://github.com/apache/hudi/pull/7359#discussion_r1382290995


##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java:
##########
@@ -99,24 +100,34 @@ public List<HoodieRecord<T>> deduplicateRecords(
 
     // caution that the avro schema is not serializable
     final Schema schema = new Schema.Parser().parse(schemaStr);
-    return keyedRecords.values().stream().map(x -> x.stream().reduce((rec1, 
rec2) -> {
-      HoodieRecord<T> reducedRecord;
-      try {
-        // Precombine do not need schema and do not return null
-        reducedRecord =  merger.merge(rec1, schema, rec2, schema, 
props).get().getLeft();
-      } catch (IOException e) {
-        throw new HoodieException(String.format("Error to merge two records, 
%s, %s", rec1, rec2), e);
+    boolean sortBeforePrecombine = merger.useSortedMerge(props);
+    return keyedRecords.values().stream().map(hoodieRecords -> {
+      Stream<HoodieRecord<T>> recordStream;
+      if (sortBeforePrecombine) {
+        recordStream = hoodieRecords.stream().sorted(new 
SortedPrecombineComparator(schema, props));

Review Comment:
   The merging order matters and it can affect correctness. Here is an example:
   Suppose we have record like below and the ordering key for each record is 
its timestamp
   |ts|city|data|changed_cols|
   |--|----|----|---|
   1|Seattle|0| |
   ---
   2nd record:
   
   |ts|city|data|changed_cols|
   |--|----|----|---|
   5|San Francisco|0|city|
   ---
   3rd record:
   |ts|city|data|changed_cols|
   |--|----|----|---|
   3|San Jose|0|city|
   ---
   The ideal state is the record reflects newer updates and ends up storing 
`San Francisco`. If we don't sort them then  the `city` would be overwritten by 
the 3rd record. The "sorted merge" logic can't be implemented within `merger` 
because it's only aware of 2 records that it's merging



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to