yihua commented on code in PR #18375:
URL: https://github.com/apache/hudi/pull/18375#discussion_r3036297461
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/UpdateProcessor.java:
##########
@@ -136,18 +140,49 @@ protected BufferedRecord<T>
handleNonDeletes(BufferedRecord<T> previousRecord, B
// special case for payloads when there is no previous record
HoodieSchema recordSchema =
readerContext.getRecordContext().decodeAvroSchema(mergedRecord.getSchemaId());
GenericRecord record =
readerContext.getRecordContext().convertToAvroRecord(mergedRecord.getRecord(),
recordSchema);
- HoodieAvroRecord hoodieRecord = new HoodieAvroRecord<>(null,
HoodieRecordUtils.loadPayload(payloadClass, record,
mergedRecord.getOrderingValue()));
- try {
- if (hoodieRecord.shouldIgnore(recordSchema, properties)) {
- return null;
- } else {
- HoodieSchema readerSchema =
readerContext.getSchemaHandler().getRequestedSchema();
- // If the record schema is different from the reader schema,
rewrite the record using the payload methods to ensure consistency with legacy
writer paths
- hoodieRecord.rewriteRecordWithNewSchema(recordSchema, properties,
readerSchema).toIndexedRecord(readerSchema, properties)
- .ifPresent(rewrittenRecord ->
mergedRecord.replaceRecord(readerContext.getRecordContext().convertAvroRecord(rewrittenRecord.getData())));
+ Schema recordAvroSchema = recordSchema.toAvroSchema();
+
+ // If convertToAvroRecord returned a cached record with a different
schema (e.g., from
+ // extractDataFromRecord caching for ExpressionPayload in the COW
write path), the record
+ // is already in write-schema format with correctly evaluated
expressions. Convert directly.
+ // Note: SENTINEL records (used by ExpressionPayload to signal "skip
this record") always
+ // have null schema (HoodieRecord.EmptyRecord.getSchema() returns
null), so they cannot
+ // enter this branch and will always go through the payload path where
shouldIgnore handles them.
+ if (record.getSchema() != null &&
!record.getSchema().equals(recordAvroSchema)) {
+
mergedRecord.replaceRecord(readerContext.getRecordContext().convertAvroRecord(record));
+ } else {
+ HoodieAvroRecord hoodieRecord = new HoodieAvroRecord<>(null,
HoodieRecordUtils.loadPayload(payloadClass, record,
mergedRecord.getOrderingValue()));
+ try {
+ if (hoodieRecord.shouldIgnore(recordSchema, properties)) {
+ return null;
+ }
+ // Evaluate the payload to get the insert value
+ Option<IndexedRecord> insertValueOpt =
hoodieRecord.getData().getInsertValue(recordAvroSchema, properties);
+ if (insertValueOpt.isPresent()) {
+ GenericRecord insertRecord = (GenericRecord)
insertValueOpt.get();
+ HoodieSchema readerSchema =
readerContext.getSchemaHandler().getRequestedSchema();
Review Comment:
🤖 When entering this branch (cached Avro record has a different schema than
`recordAvroSchema`), the record is converted and placed into `mergedRecord` via
`replaceRecord`, but `mergedRecord.getSchemaId()` still refers to the original
schema. If this BufferedRecord is later spilled to disk via `toBinary()`, it
will serialize using the stale schemaId's schema, which won't match the actual
InternalRow structure. Is the expectation that these records are never spilled?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -396,24 +399,51 @@ private static <R> Option<HoodieRecord<R>>
mergeIncomingWithExistingRecordWithEx
}
//record is inserted or updated
- String partitionPath = inferPartitionPath(incoming, existing,
writeSchemaWithMetaFields, keyGenerator, existingRecordContext, mergeResult);
- HoodieRecord<R> result =
existingRecordContext.constructHoodieRecord(mergeResult, partitionPath);
+ String partitionPath = inferPartitionPath(incoming, existing,
writeSchemaWithMetaFields, keyGenerator,
+ existingRecordContext, mergeResult, incomingBufferedRecord,
existingBufferedRecord, incomingRecordContext);
+ // When HoodieAvroRecordMerger creates a genuinely new BufferedRecord, it
encodes the schema into
+ // incomingRecordContext. Re-encode into existingRecordContext so
constructHoodieRecord (which uses
+ // existingRecordContext for the correct payload class) can resolve the
schema for SPARK records.
+ BufferedRecord<R> mergeResultForConstruct = mergeResult;
+ if (mergeResult != incomingBufferedRecord && mergeResult !=
existingBufferedRecord) {
+ HoodieSchema mergedSchema =
incomingRecordContext.getSchemaFromBufferRecord(mergeResult);
+ if (mergedSchema != null &&
existingRecordContext.getSchemaFromBufferRecord(mergeResult) == null) {
Review Comment:
🤖 The `requiresSparkRecordType()` check-and-override pattern now appears in
6+ locations across `HoodieIndexUtils`, `HoodieReadHandle`, `HoodieTable`, and
`HoodieFileWriterFactory`. Could this be centralized — e.g., a utility method
like `HoodieFileFormat.resolveRecordType(HoodieRecordType configuredType)` — to
avoid missing a site when a new format with the same constraint is added?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -251,8 +251,11 @@ public static Collection<Pair<String, Long>>
filterKeysFromFile(StoragePath file
return Collections.emptyList();
}
log.info("Going to filter {} keys from file {}",
candidateRecordKeys.size(), filePath);
Review Comment:
🤖 This always falls back to AVRO for non-Lance formats, ignoring the record
merger's configured record type. The old code was hardcoded to AVRO too, so
this isn't a regression, but is this intentional? If someone configures a
SPARK-type merger for Parquet, this would still open an AVRO reader for key
filtering.
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala:
##########
@@ -21,25 +21,84 @@ package org.apache.hudi
import org.apache.avro.generic.{GenericRecord, IndexedRecord}
import org.apache.hudi.common.engine.RecordContext
+import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.exception.HoodieException
import org.apache.spark.sql.HoodieInternalRowUtils
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.hudi.SparkAdapter
+import java.io.IOException
+import java.util.Properties
import scala.collection.mutable
trait SparkFileFormatInternalRecordContext extends
BaseSparkInternalRecordContext {
lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter
private val deserializerMap: mutable.Map[HoodieSchema,
HoodieAvroDeserializer] = mutable.Map()
private val serializerMap: mutable.Map[HoodieSchema, HoodieAvroSerializer] =
mutable.Map()
+ // Maps InternalRow instances (by identity) to their original Avro records
when the Avro record's
+ // schema differs from the BufferedRecord schema. This handles
ExpressionPayload records whose
+ // getInsertValue result carries the data schema (no meta fields) while the
BufferedRecord
+ // stores writeSchemaWithMetaFields. Returning the original Avro record from
convertToAvroRecord
+ // lets ExpressionPayload.combineAndGetUpdateValue decode bytes with the
correct data schema.
+ //
+ // IMPORTANT INVARIANT: The identity link between InternalRow and
GenericRecord must survive
+ // from extractDataFromRecord (where the cache is populated) to
convertToAvroRecord (where
+ // it is consumed via remove()). Any operation that replaces the InternalRow
object in between
+ // — such as BufferedRecord.seal() (which calls InternalRow.copy()),
replaceRecord(), or
+ // project() — would break this link and cause fallback to schema-based
serialization.
Review Comment:
🤖 The `avroRecordByRow` IdentityHashMap entries are only cleaned up via
`remove()` in `convertToAvroRecord`. If a record is cached here but the code
path skips `convertToAvroRecord` (e.g., the record is filtered, deleted, or
ignored before conversion), the entry leaks. Under sustained MOR reads with
ExpressionPayload, this could accumulate unbounded entries for the lifetime of
this context. Could you add a periodic or batch cleanup, or at minimum document
the expected lifecycle bound of this object?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala:
##########
@@ -21,25 +21,73 @@ package org.apache.hudi
import org.apache.avro.generic.{GenericRecord, IndexedRecord}
import org.apache.hudi.common.engine.RecordContext
+import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.exception.HoodieException
import org.apache.spark.sql.HoodieInternalRowUtils
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.hudi.SparkAdapter
+import java.io.IOException
+import java.util.Properties
import scala.collection.mutable
trait SparkFileFormatInternalRecordContext extends
BaseSparkInternalRecordContext {
lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter
private val deserializerMap: mutable.Map[HoodieSchema,
HoodieAvroDeserializer] = mutable.Map()
private val serializerMap: mutable.Map[HoodieSchema, HoodieAvroSerializer] =
mutable.Map()
+ // Maps InternalRow instances (by identity) to their original Avro records
when the Avro record's
Review Comment:
🤖 I'm concerned that `IdentityHashMap` is not thread-safe, and since
`FIELD_ACCESSOR_INSTANCE` is a static singleton, multiple Spark tasks will
share it concurrently on the same executor. Also,
`HoodieSparkRecord.toIndexedRecord` hardcodes the use of the singleton, while
`ReaderContext` uses a per-task instance — so the cache populated by one won't
be visible to the other. Additionally, if a record is extracted but never
converted (e.g. filtered out during merge), its entry might leak in the map and
cause memory pressure in long-running executors.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/TestQueryMergeOnReadOptimizedTable.scala:
##########
@@ -19,63 +19,74 @@
package org.apache.spark.sql.hudi.feature
+import org.apache.hudi.HoodieSparkUtils
+
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
class TestQueryMergeOnReadOptimizedTable extends HoodieSparkSqlTestBase {
- test("Test Query Merge_On_Read Read_Optimized table") {
- withTempDir { tmp =>
- val tableName = generateTableName
- val tablePath = s"${tmp.getCanonicalPath}/$tableName"
- // create table
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | price double,
- | ts long,
- | partition long
- |) using hudi
- | partitioned by (partition)
- | location '$tablePath'
- | tblproperties (
- | type = 'mor',
- | primaryKey = 'id',
- | orderingFields = 'ts'
- | )
- """.stripMargin)
- // insert data to table
- withSQLConf("hoodie.parquet.max.file.size" -> "10000") {
- spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 1000)")
- spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, 1000)")
- spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000, 1000)")
- spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000, 1000)")
- spark.sql(s"update $tableName set price = 11 where id = 1")
- spark.sql(s"update $tableName set price = 21 where id = 2")
- spark.sql(s"update $tableName set price = 31 where id = 3")
- spark.sql(s"update $tableName set price = 41 where id = 4")
- // expect that all complete parquet files can be scanned
- assertQueryResult(4, tablePath)
+ val baseFileFormats: List[String] = if (HoodieSparkUtils.gteqSpark3_4)
List("parquet", "lance") else List("parquet")
+
+ baseFileFormats.foreach { baseFileFormat =>
+ test(s"Test Query Merge_On_Read Read_Optimized table - $baseFileFormat") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+ // create table
+ // No explicit hoodie.write.record.merge.custom.implementation.classes
needed:
+ // orderingFields='ts' triggers EVENT_TIME_ORDERING merge mode, which
auto-selects
+ // DefaultSparkRecordMerger for lance (SPARK record type).
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long,
+ | partition long
+ |) using hudi
+ | partitioned by (partition)
+ | location '$tablePath'
+ | tblproperties (
+ | type = 'mor',
+ | primaryKey = 'id',
+ | orderingFields = 'ts',
+ | hoodie.base.file.format = '$baseFileFormat'
+ | )
+ """.stripMargin)
+ // insert data to table
+ withSQLConf("hoodie.parquet.max.file.size" -> "10000") {
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 1000)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, 1000)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000, 1000)")
+ spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000, 1000)")
Review Comment:
🤖 This uses `hoodie.base.file.format` as the tblproperty key, but
`TestSparkSqlCoreFlow` uses `hoodie.table.base.file.format`. Are both valid? If
one is incorrect, the lance tests would silently fall back to parquet and pass
without actually exercising the lance code path.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]