rahil-c commented on code in PR #18375:
URL: https://github.com/apache/hudi/pull/18375#discussion_r3069731776
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala:
##########
@@ -21,25 +21,88 @@ package org.apache.hudi
import org.apache.avro.generic.{GenericRecord, IndexedRecord}
import org.apache.hudi.common.engine.RecordContext
+import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.exception.HoodieException
import org.apache.spark.sql.HoodieInternalRowUtils
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.hudi.SparkAdapter
+import java.io.IOException
+import java.util.Properties
import scala.collection.mutable
trait SparkFileFormatInternalRecordContext extends
BaseSparkInternalRecordContext {
lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter
private val deserializerMap: mutable.Map[HoodieSchema,
HoodieAvroDeserializer] = mutable.Map()
private val serializerMap: mutable.Map[HoodieSchema, HoodieAvroSerializer] =
mutable.Map()
+ // Maps InternalRow instances (by identity) to their original Avro records
when the Avro record's
+ // schema differs from the BufferedRecord schema. This handles
ExpressionPayload records whose
+ // getInsertValue result carries the data schema (no meta fields) while the
BufferedRecord
+ // stores writeSchemaWithMetaFields. Returning the original Avro record from
convertToAvroRecord
+ // lets ExpressionPayload.combineAndGetUpdateValue decode bytes with the
correct data schema.
+ //
+ // IMPORTANT INVARIANT: The identity link between InternalRow and
GenericRecord must survive
+ // from extractDataFromRecord (where the cache is populated) to
convertToAvroRecord (where
+ // it is consumed via remove()). Any operation that replaces the InternalRow
object in between
Review Comment:
**[Critical]** The `IdentityHashMap<InternalRow, GenericRecord>` cache
relies on object identity surviving from `extractDataFromRecord` to
`convertToAvroRecord`. Any intermediate `InternalRow.copy()`, `seal()`,
`replaceRecord()`, or `project()` silently breaks the link and falls back to
schema-based serialization — which produces wrong results for ExpressionPayload
(wrong schema used for decode).
The extensive comments documenting this invariant are actually evidence of
how fragile it is. Consider either:
1. A wrapper type that carries the GenericRecord explicitly (e.g. a
`TaggedInternalRow`), or
2. Keying by record key string instead of object identity
At minimum, add a debug assertion or metric when the cache is non-empty
after a FileGroup read completes, so violations are caught rather than silently
producing wrong data.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/UpdateProcessor.java:
##########
@@ -136,18 +140,54 @@ protected BufferedRecord<T>
handleNonDeletes(BufferedRecord<T> previousRecord, B
Review Comment:
**[High]** This branch uses `record.getSchema() != recordAvroSchema` as a
heuristic to detect cached ExpressionPayload records. But schema inequality can
also occur during legitimate schema evolution (e.g., a reader schema that
differs from the writer schema). This heuristic conflates two unrelated
conditions.
Could we instead use an explicit signal (e.g. a boolean flag from the
RecordContext indicating the record came from the cache) rather than inferring
intent from schema comparison?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]