stayrascal commented on a change in pull request #4724:
URL: https://github.com/apache/hudi/pull/4724#discussion_r814533349



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
##########
@@ -58,6 +58,31 @@ default T preCombine(T oldValue, Properties properties) {
     return preCombine(oldValue);
   }
 
+  /**
+   *When more than one HoodieRecord have the same HoodieKey in the incoming 
batch, this function combines them before attempting to insert/upsert by taking 
in a property map.
+   *
+   * @param oldValue instance of the old {@link HoodieRecordPayload} to be 
combined with.
+   * @param properties Payload related properties. For example pass the 
ordering field(s) name to extract from value in storage.
+   * @param schema Schema used for record
+   * @return the combined value
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
+  default T preCombine(T oldValue, Properties properties, Schema schema) {

Review comment:
       Hi @alexeykudinkin, I got your point. if we have to combine two records 
to a combined one, we'd better to implement the combine logics in other place, 
maybe in some `util` or `helper` classes, right?
   
   Here are some options from mine that `#preCombine` might be a better place 
to implement these logics, or create new `merge` method in 
`HoodieRecordPayload`  interface.
   - First, from the description of `preCombine` method, it used for combining 
multiple records with same HoodieKey before attempting to insert/upsert to 
disk. The "combine multiple records" might not mean only choosing one of them, 
we also can combine & merged them to a new one, just depends on how the 
sub-class implement the preCombine logic(Please correct me if my understanding 
is wrong :) ). Yeah, it might be a little bit confused that we need `Schema` if 
we are trying to merged them.
   - Second, I checked when will we call `preCombine` method is trying to 
duplicate records with same HoodieKey before insert/update to disk, especially 
in Flink write case, even through the duplicated logic is choose the latest 
record, but we need to ensure that one HoodieKey should only contains one 
record before comparing to existing record and write to disk, otherwise, some 
records will missed. For example, in `HoodieMergeHandle.init(fieId, 
newRecordsIter)`, it will convert the record iterator to a map and treat the 
recordKey as key. *So we might not stop de-duping logics and merge them against 
what is on disk unless we change the logic here.* And also we implement another 
class/method to handle the merge logic, and switch the existing de-duping logic 
from calling `preCombine` to new class/method, we have to add an condition to 
control whether should we call `preCombine` or not, I think it might not a good 
way. Instead, we should handle it in `preCombine` method by different impl
 emented payload.
   
   That's what my thought here, and I'm glad to listen your useful suggestions. 
:)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to