yihua commented on code in PR #18375:
URL: https://github.com/apache/hudi/pull/18375#discussion_r3036297461


##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/UpdateProcessor.java:
##########
@@ -136,18 +140,49 @@ protected BufferedRecord<T> 
handleNonDeletes(BufferedRecord<T> previousRecord, B
         // special case for payloads when there is no previous record
         HoodieSchema recordSchema = 
readerContext.getRecordContext().decodeAvroSchema(mergedRecord.getSchemaId());
         GenericRecord record = 
readerContext.getRecordContext().convertToAvroRecord(mergedRecord.getRecord(), 
recordSchema);
-        HoodieAvroRecord hoodieRecord = new HoodieAvroRecord<>(null, 
HoodieRecordUtils.loadPayload(payloadClass, record, 
mergedRecord.getOrderingValue()));
-        try {
-          if (hoodieRecord.shouldIgnore(recordSchema, properties)) {
-            return null;
-          } else {
-            HoodieSchema readerSchema = 
readerContext.getSchemaHandler().getRequestedSchema();
-            // If the record schema is different from the reader schema, 
rewrite the record using the payload methods to ensure consistency with legacy 
writer paths
-            hoodieRecord.rewriteRecordWithNewSchema(recordSchema, properties, 
readerSchema).toIndexedRecord(readerSchema, properties)
-                .ifPresent(rewrittenRecord -> 
mergedRecord.replaceRecord(readerContext.getRecordContext().convertAvroRecord(rewrittenRecord.getData())));
+        Schema recordAvroSchema = recordSchema.toAvroSchema();
+
+        // If convertToAvroRecord returned a cached record with a different 
schema (e.g., from
+        // extractDataFromRecord caching for ExpressionPayload in the COW 
write path), the record
+        // is already in write-schema format with correctly evaluated 
expressions. Convert directly.
+        // Note: SENTINEL records (used by ExpressionPayload to signal "skip 
this record") always
+        // have null schema (HoodieRecord.EmptyRecord.getSchema() returns 
null), so they cannot
+        // enter this branch and will always go through the payload path where 
shouldIgnore handles them.
+        if (record.getSchema() != null && 
!record.getSchema().equals(recordAvroSchema)) {
+          
mergedRecord.replaceRecord(readerContext.getRecordContext().convertAvroRecord(record));
+        } else {
+          HoodieAvroRecord hoodieRecord = new HoodieAvroRecord<>(null, 
HoodieRecordUtils.loadPayload(payloadClass, record, 
mergedRecord.getOrderingValue()));
+          try {
+            if (hoodieRecord.shouldIgnore(recordSchema, properties)) {
+              return null;
+            }
+            // Evaluate the payload to get the insert value
+            Option<IndexedRecord> insertValueOpt = 
hoodieRecord.getData().getInsertValue(recordAvroSchema, properties);
+            if (insertValueOpt.isPresent()) {
+              GenericRecord insertRecord = (GenericRecord) 
insertValueOpt.get();
+              HoodieSchema readerSchema = 
readerContext.getSchemaHandler().getRequestedSchema();

Review Comment:
   🤖 When entering this branch (cached Avro record has a different schema than 
`recordAvroSchema`), the record is converted and placed into `mergedRecord` via 
`replaceRecord`, but `mergedRecord.getSchemaId()` still refers to the original 
schema. If this BufferedRecord is later spilled to disk via `toBinary()`, it 
will serialize using the stale schemaId's schema, which won't match the actual 
InternalRow structure. Is the expectation that these records are never spilled?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -396,24 +399,51 @@ private static <R> Option<HoodieRecord<R>> 
mergeIncomingWithExistingRecordWithEx
     }
 
     //record is inserted or updated
-    String partitionPath = inferPartitionPath(incoming, existing, 
writeSchemaWithMetaFields, keyGenerator, existingRecordContext, mergeResult);
-    HoodieRecord<R> result = 
existingRecordContext.constructHoodieRecord(mergeResult, partitionPath);
+    String partitionPath = inferPartitionPath(incoming, existing, 
writeSchemaWithMetaFields, keyGenerator,
+        existingRecordContext, mergeResult, incomingBufferedRecord, 
existingBufferedRecord, incomingRecordContext);
+    // When HoodieAvroRecordMerger creates a genuinely new BufferedRecord, it 
encodes the schema into
+    // incomingRecordContext. Re-encode into existingRecordContext so 
constructHoodieRecord (which uses
+    // existingRecordContext for the correct payload class) can resolve the 
schema for SPARK records.
+    BufferedRecord<R> mergeResultForConstruct = mergeResult;
+    if (mergeResult != incomingBufferedRecord && mergeResult != 
existingBufferedRecord) {
+      HoodieSchema mergedSchema = 
incomingRecordContext.getSchemaFromBufferRecord(mergeResult);
+      if (mergedSchema != null && 
existingRecordContext.getSchemaFromBufferRecord(mergeResult) == null) {

Review Comment:
   🤖 The `requiresSparkRecordType()` check-and-override pattern now appears in 
6+ locations across `HoodieIndexUtils`, `HoodieReadHandle`, `HoodieTable`, and 
`HoodieFileWriterFactory`. Could this be centralized — e.g., a utility method 
like `HoodieFileFormat.resolveRecordType(HoodieRecordType configuredType)` — to 
avoid missing a site when a new format with the same constraint is added?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -251,8 +251,11 @@ public static Collection<Pair<String, Long>> 
filterKeysFromFile(StoragePath file
       return Collections.emptyList();
     }
     log.info("Going to filter {} keys from file {}", 
candidateRecordKeys.size(), filePath);

Review Comment:
   🤖 This always falls back to AVRO for non-Lance formats, ignoring the record 
merger's configured record type. The old code was hardcoded to AVRO too, so 
this isn't a regression, but is this intentional? If someone configures a 
SPARK-type merger for Parquet, this would still open an AVRO reader for key 
filtering.



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala:
##########
@@ -21,25 +21,84 @@ package org.apache.hudi
 
 import org.apache.avro.generic.{GenericRecord, IndexedRecord}
 import org.apache.hudi.common.engine.RecordContext
+import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.schema.HoodieSchema
 import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.exception.HoodieException
 import org.apache.spark.sql.HoodieInternalRowUtils
 import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.hudi.SparkAdapter
 
+import java.io.IOException
+import java.util.Properties
 import scala.collection.mutable
 
 trait SparkFileFormatInternalRecordContext extends 
BaseSparkInternalRecordContext {
 
   lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter
   private val deserializerMap: mutable.Map[HoodieSchema, 
HoodieAvroDeserializer] = mutable.Map()
   private val serializerMap: mutable.Map[HoodieSchema, HoodieAvroSerializer] = 
mutable.Map()
+  // Maps InternalRow instances (by identity) to their original Avro records 
when the Avro record's
+  // schema differs from the BufferedRecord schema. This handles 
ExpressionPayload records whose
+  // getInsertValue result carries the data schema (no meta fields) while the 
BufferedRecord
+  // stores writeSchemaWithMetaFields. Returning the original Avro record from 
convertToAvroRecord
+  // lets ExpressionPayload.combineAndGetUpdateValue decode bytes with the 
correct data schema.
+  //
+  // IMPORTANT INVARIANT: The identity link between InternalRow and 
GenericRecord must survive
+  // from extractDataFromRecord (where the cache is populated) to 
convertToAvroRecord (where
+  // it is consumed via remove()). Any operation that replaces the InternalRow 
object in between
+  // — such as BufferedRecord.seal() (which calls InternalRow.copy()), 
replaceRecord(), or
+  // project() — would break this link and cause fallback to schema-based 
serialization.

Review Comment:
   🤖 The `avroRecordByRow` IdentityHashMap entries are only cleaned up via 
`remove()` in `convertToAvroRecord`. If a record is cached here but the code 
path skips `convertToAvroRecord` (e.g., the record is filtered, deleted, or 
ignored before conversion), the entry leaks. Under sustained MOR reads with 
ExpressionPayload, this could accumulate unbounded entries for the lifetime of 
this context. Could you add a periodic or batch cleanup, or at minimum document 
the expected lifecycle bound of this object?



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala:
##########
@@ -21,25 +21,73 @@ package org.apache.hudi
 
 import org.apache.avro.generic.{GenericRecord, IndexedRecord}
 import org.apache.hudi.common.engine.RecordContext
+import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.schema.HoodieSchema
 import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.exception.HoodieException
 import org.apache.spark.sql.HoodieInternalRowUtils
 import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.hudi.SparkAdapter
 
+import java.io.IOException
+import java.util.Properties
 import scala.collection.mutable
 
 trait SparkFileFormatInternalRecordContext extends 
BaseSparkInternalRecordContext {
 
   lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter
   private val deserializerMap: mutable.Map[HoodieSchema, 
HoodieAvroDeserializer] = mutable.Map()
   private val serializerMap: mutable.Map[HoodieSchema, HoodieAvroSerializer] = 
mutable.Map()
+  // Maps InternalRow instances (by identity) to their original Avro records 
when the Avro record's

Review Comment:
   🤖 I'm concerned that `IdentityHashMap` is not thread-safe, and since 
`FIELD_ACCESSOR_INSTANCE` is a static singleton, multiple Spark tasks will 
share it concurrently on the same executor. Also, 
`HoodieSparkRecord.toIndexedRecord` hardcodes the use of the singleton, while 
`ReaderContext` uses a per-task instance — so the cache populated by one won't 
be visible to the other. Additionally, if a record is extracted but never 
converted (e.g. filtered out during merge), its entry might leak in the map and 
cause memory pressure in long-running executors.



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/TestQueryMergeOnReadOptimizedTable.scala:
##########
@@ -19,63 +19,74 @@
 
 package org.apache.spark.sql.hudi.feature
 
+import org.apache.hudi.HoodieSparkUtils
+
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
 
 class TestQueryMergeOnReadOptimizedTable extends HoodieSparkSqlTestBase {
-  test("Test Query Merge_On_Read Read_Optimized table") {
-    withTempDir { tmp =>
-      val tableName = generateTableName
-      val tablePath = s"${tmp.getCanonicalPath}/$tableName"
-      // create table
-      spark.sql(
-        s"""
-           |create table $tableName (
-           |  id int,
-           |  name string,
-           |  price double,
-           |  ts long,
-           |  partition long
-           |) using hudi
-           | partitioned by (partition)
-           | location '$tablePath'
-           | tblproperties (
-           |  type = 'mor',
-           |  primaryKey = 'id',
-           |  orderingFields = 'ts'
-           | )
-       """.stripMargin)
-      // insert data to table
-      withSQLConf("hoodie.parquet.max.file.size" -> "10000") {
-        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 1000)")
-        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, 1000)")
-        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000, 1000)")
-        spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000, 1000)")
-        spark.sql(s"update $tableName set price = 11 where id = 1")
-        spark.sql(s"update $tableName set price = 21 where id = 2")
-        spark.sql(s"update $tableName set price = 31 where id = 3")
-        spark.sql(s"update $tableName set price = 41 where id = 4")
 
-        // expect that all complete parquet files can be scanned
-        assertQueryResult(4, tablePath)
+  val baseFileFormats: List[String] = if (HoodieSparkUtils.gteqSpark3_4) 
List("parquet", "lance") else List("parquet")
+
+  baseFileFormats.foreach { baseFileFormat =>
+    test(s"Test Query Merge_On_Read Read_Optimized table - $baseFileFormat") {
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+        // create table
+        // No explicit hoodie.write.record.merge.custom.implementation.classes 
needed:
+        // orderingFields='ts' triggers EVENT_TIME_ORDERING merge mode, which 
auto-selects
+        // DefaultSparkRecordMerger for lance (SPARK record type).
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long,
+             |  partition long
+             |) using hudi
+             | partitioned by (partition)
+             | location '$tablePath'
+             | tblproperties (
+             |  type = 'mor',
+             |  primaryKey = 'id',
+             |  orderingFields = 'ts',
+             |  hoodie.base.file.format = '$baseFileFormat'
+             | )
+         """.stripMargin)
+        // insert data to table
+        withSQLConf("hoodie.parquet.max.file.size" -> "10000") {
+          spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 1000)")
+          spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, 1000)")
+          spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000, 1000)")
+          spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000, 1000)")

Review Comment:
   🤖 This uses `hoodie.base.file.format` as the tblproperty key, but 
`TestSparkSqlCoreFlow` uses `hoodie.table.base.file.format`. Are both valid? If 
one is incorrect, the lance tests would silently fall back to parquet and pass 
without actually exercising the lance code path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to