danny0405 commented on code in PR #13699:
URL: https://github.com/apache/hudi/pull/13699#discussion_r2272721268


##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/UpdateProcessor.java:
##########
@@ -80,16 +106,79 @@ public BufferedRecord<T> processUpdate(String recordKey, 
BufferedRecord<T> previ
         }
         return null;
       } else {
-        T prevRow = previousRecord != null ? previousRecord.getRecord() : null;
-        T mergedRow = mergedRecord.getRecord();
-        if (prevRow != null && prevRow != mergedRow) {
-          mergedRecord.setHoodieOperation(HoodieOperation.UPDATE_AFTER);
-          readStats.incrementNumUpdates();
-        } else if (prevRow == null) {
-          mergedRecord.setHoodieOperation(HoodieOperation.INSERT);
-          readStats.incrementNumInserts();
+        return handleNonDeletes(previousRecord, mergedRecord);
+      }
+    }
+
+    protected BufferedRecord<T> handleNonDeletes(BufferedRecord<T> 
previousRecord, BufferedRecord<T> mergedRecord) {
+      T prevRow = previousRecord != null ? previousRecord.getRecord() : null;
+      T mergedRow = mergedRecord.getRecord();
+      if (prevRow != null && prevRow != mergedRow) {
+        mergedRecord.setHoodieOperation(HoodieOperation.UPDATE_AFTER);
+        readStats.incrementNumUpdates();
+      } else if (prevRow == null) {
+        mergedRecord.setHoodieOperation(HoodieOperation.INSERT);
+        readStats.incrementNumInserts();
+      }
+      return mergedRecord.seal(readerContext);
+    }
+  }
+
+  class PayloadUpdateProcessor<T> extends StandardUpdateProcessor<T> {
+    private final String payloadClass;
+    private final Properties properties;
+
+    public PayloadUpdateProcessor(HoodieReadStats readStats, 
HoodieReaderContext<T> readerContext, boolean emitDeletes,
+                                  Properties properties, String payloadClass) {
+      super(readStats, readerContext, emitDeletes);
+      this.payloadClass = payloadClass;
+      this.properties = properties;
+    }
+
+    @Override
+    protected BufferedRecord<T> handleNonDeletes(BufferedRecord<T> 
previousRecord, BufferedRecord<T> mergedRecord) {
+      if (previousRecord == null) {
+        // special case for payloads when there is no previous record
+        Schema recordSchema = 
readerContext.getRecordContext().decodeAvroSchema(mergedRecord.getSchemaId());
+        GenericRecord record = 
readerContext.getRecordContext().convertToAvroRecord(mergedRecord.getRecord(), 
recordSchema);
+        HoodieAvroRecord hoodieRecord = new HoodieAvroRecord<>(null, 
HoodieRecordUtils.loadPayload(payloadClass, record, 
mergedRecord.getOrderingValue()));
+        try {
+          if (hoodieRecord.shouldIgnore(recordSchema, properties)) {
+            return null;
+          } else {
+            Schema readerSchema = 
readerContext.getSchemaHandler().getRequestedSchema();
+            // If the record schema is different from the reader schema, 
rewrite the record using the payload methods to ensure consistency with legacy 
writer paths
+            hoodieRecord.rewriteRecordWithNewSchema(recordSchema, properties, 
readerSchema).toIndexedRecord(readerSchema, properties)
+                .ifPresent(rewrittenRecord -> 
mergedRecord.replaceRecord(readerContext.getRecordContext().convertAvroRecord(rewrittenRecord.getData())));
+          }
+        } catch (IOException e) {
+          throw new HoodieIOException("Error processing record with payload 
class: " + payloadClass, e);
+        }
+      }
+      return super.handleNonDeletes(previousRecord, mergedRecord);
+    }
+  }
+
+  class CustomMergerUpdateProcessor<T> extends StandardUpdateProcessor<T> {
+    private final HoodieRecordMerger merger;
+    private final TypedProperties properties;
+
+    CustomMergerUpdateProcessor(HoodieReadStats readStats, 
HoodieReaderContext<T> readerContext, boolean emitDeletes,
+                                TypedProperties properties) {
+      super(readStats, readerContext, emitDeletes);
+      this.merger = readerContext.getRecordMerger().get();
+      this.properties = properties;
+    }
+
+    @Override
+    protected BufferedRecord<T> handleNonDeletes(BufferedRecord<T> 
previousRecord, BufferedRecord<T> mergedRecord) {
+      try {
+        if 
(merger.shouldFlush(readerContext.getRecordContext().constructHoodieRecord(mergedRecord),
 readerContext.getRecordContext().getSchemaFromBufferRecord(mergedRecord), 
properties)) {

Review Comment:
   JIRA created to trace the following-up fixes: 
https://issues.apache.org/jira/browse/HUDI-9709



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to