stayrascal commented on a change in pull request #4141:
URL: https://github.com/apache/hudi/pull/4141#discussion_r788331396



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateWithLatestAvroPayload.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+
+import static org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro;
+
+/**
+ * The only difference with {@link DefaultHoodieRecordPayload} is that support 
update partial fields
+ * in latest record which value is not null to existing record instead of all 
fields.
+ *
+ * <p> Assuming a {@link GenericRecord} has three fields: a int , b int, c 
int. The first record value: 1, 2, 3.
+ * The second record value is: 4, 5, null, the field c value is null. After 
call the combineAndGetUpdateValue method,
+ * we will get final record value: 4, 5, 3, field c value will not be 
overwritten because its value is null in latest record.
+ */
+public class PartialUpdateWithLatestAvroPayload extends 
DefaultHoodieRecordPayload {
+

Review comment:
       Hi @danny0405 , if I didn't understand wrongly, the `preCombine` method 
will be called during deduplicate record by `flushBucket` or `flushRemaining` 
in `SteamWriteFucntion`, which will only deduplicate the records(new 
created/updated record) in the buffer. 
   
   If we only overwrite `preCombine`, which will only update/merge the records 
in the buffer, but if there is a record with same recordKey existed in base/log 
file, the record will be overwrote by the  new merged record from buffer, right?
   
   For example, in COW mode, we might still need to overwrite 
`combineAndGetUpdateValue` method, because it will be called by 
`HoodieMergeHandle.write(GenericRecord oldRecord)`, this method will merge the 
new merged record with old records.
   ```
   public void write(GenericRecord oldRecord) {
       String key = KeyGenUtils.getRecordKeyFromGenericRecord(oldRecord, 
keyGeneratorOpt);
       boolean copyOldRecord = true;
       if (keyToNewRecords.containsKey(key)) {
         // If we have duplicate records that we are updating, then the hoodie 
record will be deflated after
         // writing the first record. So make a copy of the record to be merged
         HoodieRecord<T> hoodieRecord = new 
HoodieRecord<>(keyToNewRecords.get(key));
         try {
           Option<IndexedRecord> combinedAvroRecord =
               hoodieRecord.getData().combineAndGetUpdateValue(oldRecord,
                 useWriterSchema ? tableSchemaWithMetaFields : tableSchema,
                   config.getPayloadConfig().getProps());
   
           if (combinedAvroRecord.isPresent() && 
combinedAvroRecord.get().equals(IGNORE_RECORD)) {
             // If it is an IGNORE_RECORD, just copy the old record, and do not 
update the new record.
             copyOldRecord = true;
           } else if (writeUpdateRecord(hoodieRecord, oldRecord, 
combinedAvroRecord)) {
             /*
              * ONLY WHEN 1) we have an update for this key AND 2) We are able 
to successfully
              * write the the combined new
              * value
              *
              * We no longer need to copy the old record over.
              */
             copyOldRecord = false;
           }
           writtenRecordKeys.add(key);
         } catch (Exception e) {
           throw new HoodieUpsertException("Failed to combine/merge new record 
with old value in storage, for new record {"
               + keyToNewRecords.get(key) + "}, old value {" + oldRecord + "}", 
e);
         }
       }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to