wombatu-kun commented on code in PR #18375:
URL: https://github.com/apache/hudi/pull/18375#discussion_r3179817185


##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/UpdateProcessor.java:
##########
@@ -135,19 +139,59 @@ protected BufferedRecord<T> 
handleNonDeletes(BufferedRecord<T> previousRecord, B
       if (previousRecord == null) {
         // special case for payloads when there is no previous record
         HoodieSchema recordSchema = 
readerContext.getRecordContext().decodeAvroSchema(mergedRecord.getSchemaId());
-        GenericRecord record = 
readerContext.getRecordContext().convertToAvroRecord(mergedRecord.getRecord(), 
recordSchema);
-        HoodieAvroRecord hoodieRecord = new HoodieAvroRecord<>(null, 
HoodieRecordUtils.loadPayload(payloadClass, record, 
mergedRecord.getOrderingValue()));
-        try {
-          if (hoodieRecord.shouldIgnore(recordSchema, properties)) {
-            return null;
-          } else {
-            HoodieSchema readerSchema = 
readerContext.getSchemaHandler().getRequestedSchema();
-            // If the record schema is different from the reader schema, 
rewrite the record using the payload methods to ensure consistency with legacy 
writer paths
-            hoodieRecord.rewriteRecordWithNewSchema(recordSchema, properties, 
readerSchema).toIndexedRecord(readerSchema, properties)
-                .ifPresent(rewrittenRecord -> 
mergedRecord.replaceRecord(readerContext.getRecordContext().convertAvroRecord(rewrittenRecord.getData())));
+        GenericRecord originalAvro = mergedRecord.getOriginalAvroRecord();
+        Schema recordAvroSchema = recordSchema.toAvroSchema();
+
+        // When the merged record carries an originalAvroRecord (populated by 
extractDataFromRecord
+        // for ExpressionPayload in the COW write path via ExtractedData), the 
record is already in
+        // write-schema format with correctly evaluated expressions. Convert 
directly and skip the
+        // payload path.
+        //
+        // NOTE: this branch bypasses shouldIgnore. That is safe today because 
the only payload that
+        // populates originalAvroRecord is ExpressionPayload, which never 
returns shouldIgnore=true.
+        // If a future payload starts producing an originalAvroRecord, it must 
add a shouldIgnore
+        // check here.
+        if (originalAvro != null) {
+          // After replaceRecord(), mergedRecord.getSchemaId() still 
references the original schema.
+          // This is safe because the record is emitted immediately via 
super.handleNonDeletes() below,
+          // which calls seal() and produces the output row — the record is 
not spilled to disk
+          // (via toBinary()) between replaceRecord and emit in this 
single-record path. If this
+          // assumption changes, the schemaId must be updated after 
replaceRecord.

Review Comment:
   Good catch — the chain you describe is real: 
`CallbackProcessor.processUpdate` calls `callback.onInsert` after the delegate 
returns, and `CDCCallback.convertOutput` resolves the schema via 
`getSchemaFromBufferRecord(record)` → the stale `schemaId`, so a data-only 
`InternalRow` would be serialized against the writeSchema-with-meta-fields and 
arity-mismatch.
   
   Confirming on coverage: I don't have a test that exercises the SQL `MERGE 
INTO` + `ExpressionPayload` + COW + `hoodie.table.cdc.enabled=true` combination 
in this PR's matrix — Lance is COW-only and has no CDC coverage yet.
   
   The NOTE block in this method (lines 155-159) deliberately scopes the safety 
claim to "the record is not spilled to disk between replaceRecord and emit in 
this single-record path" — the CDC callback fires *after* `processUpdate` 
returns, so it's exactly the "if this assumption changes" case. Since this PR's 
scope is Lance + Spark-SQL parity, I'd prefer to track this as a follow-up: 
file a separate issue to (a) update `schemaId` after `replaceRecord` to the 
data-only schema id and (b) add a CDC + MERGE INTO regression test. Happy to 
open that issue and link it here if you'd like.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to