wzx140 commented on code in PR #6745:
URL: https://github.com/apache/hudi/pull/6745#discussion_r992399805


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java:
##########
@@ -75,24 +75,31 @@ public HoodieMergeHandleWithChangeLog(HoodieWriteConfig 
config, String instantTi
         IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
   }
 
-  protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, 
HoodieRecord oldRecord, Option<HoodieRecord> combineRecordOp)
+  protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, 
HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOp, Schema 
writerSchema)
       throws IOException {
-    final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord, 
combineRecordOp);
+    Option<HoodieRecord> savedCombineRecordOp = 
combineRecordOp.map(HoodieRecord::newInstance);
+    HoodieRecord<T> savedOldRecord = oldRecord.newInstance();
+    final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord, 
combineRecordOp, writerSchema);
     if (result) {
       boolean isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
-      cdcLogger.put(hoodieRecord, (GenericRecord) ((HoodieAvroIndexedRecord) 
oldRecord).getData(), isDelete ? Option.empty() : combineRecordOp.map(rec -> 
((HoodieAvroIndexedRecord) rec).getData()));
+      Option<IndexedRecord> combineRecord;
+      if (combineRecordOp.isPresent()) {
+        combineRecord = 
savedCombineRecordOp.get().toIndexedRecord(writerSchema, 
config.getPayloadConfig().getProps()).map(HoodieAvroIndexedRecord::getData);
+      } else {
+        combineRecord = Option.empty();
+      }
+      cdcLogger.put(hoodieRecord, (GenericRecord) ((HoodieAvroIndexedRecord) 
savedOldRecord).getData(), isDelete ? Option.empty() : combineRecord);
+      hoodieRecord.deflate();
     }
     return result;
   }
 
-  protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws 
IOException {
-    // Get the data before deflated
-    Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : 
tableSchema;
-    Option<IndexedRecord> recordOption = hoodieRecord.toIndexedRecord(schema, 
this.config.getProps())
-        .map(HoodieRecord::getData);
-    super.writeInsertRecord(hoodieRecord);
+  protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, Schema 
schema) throws IOException {
+    HoodieRecord<T> savedRecord = hoodieRecord.newInstance();

Review Comment:
   Yes. But how can we do the copying inside the CDCLogger itself. We only 
call`cdcLogger.put` when `result` is true.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to