bibhu107 commented on code in PR #12201:
URL: https://github.com/apache/hudi/pull/12201#discussion_r1835739524
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala:
##########
@@ -164,6 +163,34 @@ class ExpressionPayload(@transient record: GenericRecord,
}
}
+ private def doRecordMerge(incomingRecord: GenericRecord,
+ existingRecord: IndexedRecord,
+ schema: Schema,
+ properties: Properties): HOption[IndexedRecord] = {
+ val originalPayload = properties.getProperty(PAYLOAD_ORIGINAL_AVRO_PAYLOAD)
+ if
(originalPayload.equals(classOf[OverwriteWithLatestAvroPayload].getName)) {
+ HOption.of(incomingRecord)
+ } else if
(originalPayload.equals(classOf[DefaultHoodieRecordPayload].getName)) {
+ if (needUpdatingPersistedRecord(existingRecord, incomingRecord,
properties)) {
Review Comment:
Typically, updates are triggered based on a date field, such as
TransactionDate. However, there may be cases where a client provides a new
schema with an incoming record that shares the same TransactionDate as an
existing record. In these instances, `needUpdatingPersistedRecord` should
return true to ensure that the existing record is updated with the new data,
supporting seamless schema evolution.
Curious if that is taken care with `needUpdatingPersistedRecord`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]