[
https://issues.apache.org/jira/browse/HUDI-1397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Xianjin YE updated HUDI-1397:
-----------------------------
Description:
Hi, We were writing our internal payload class and found that Hudi's
RealtimeCompactedRecordReader's behavior doesn't match HoodieMergeOnReadRDD.
To be specifically, when reading a delta record with merging log and base
enabled, the expected behavior would be merge base + delta record. It's
correctly handled by `HoodieMergeOnReadRDD`'s `mergeRowWithLog` method
{code:java}
private def mergeRowWithLog(curRow: InternalRow, curKey: String) = {
val historyAvroRecord =
serializer.serialize(curRow).asInstanceOf[GenericRecord]
logRecords.get(curKey).getData.combineAndGetUpdateValue(historyAvroRecord,
tableAvroSchema)
}
{code}
However the similar logic cannot be found in `RealtimeCompactedRecordReader's`,
it just assumes the record in delta log is the latest.
cc [~garyli1019] since you wrote the `HoodieMergeOnReadRDD` code. It would be
wonderful to merge base and delta record in the `RealtimeCompactedRecordReader`
Also cc [~wayblink].
was:
Hi, We were writing our internal payload class and found that Hudi's
RealtimeCompactedRecordReader's behavior doesn't match HoodieMergeOnReadRDD.
To be specifically, when reading a delta record with merging log and base
enabled, the expected behavior would be merge base + delta record. It's
correctly handled by `HoodieMergeOnReadRDD`'s `mergeRowWithLog` method
{code:java}
private def mergeRowWithLog(curRow: InternalRow, curKey: String) = {
val historyAvroRecord =
serializer.serialize(curRow).asInstanceOf[GenericRecord]
logRecords.get(curKey).getData.combineAndGetUpdateValue(historyAvroRecord,
tableAvroSchema)
}
{code}
However the similar logic cannot be found in `RealtimeCompactedRecordReader's`,
it just assumes the record in delta log is the latest.
cc [~garyli1019] since you wrote the `HoodieMergeOnReadRDD` code. It would be
wonderful to merge base and delta record in the `RealtimeCompactedRecordReader`
> Different behavior between RealtimeCompactedRecordReader and
> HoodieMergeOnReadRDD
> ---------------------------------------------------------------------------------
>
> Key: HUDI-1397
> URL: https://issues.apache.org/jira/browse/HUDI-1397
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: Xianjin YE
> Priority: Major
>
> Hi, We were writing our internal payload class and found that Hudi's
> RealtimeCompactedRecordReader's behavior doesn't match HoodieMergeOnReadRDD.
>
> To be specifically, when reading a delta record with merging log and base
> enabled, the expected behavior would be merge base + delta record. It's
> correctly handled by `HoodieMergeOnReadRDD`'s `mergeRowWithLog` method
> {code:java}
> private def mergeRowWithLog(curRow: InternalRow, curKey: String) = {
> val historyAvroRecord =
> serializer.serialize(curRow).asInstanceOf[GenericRecord]
> logRecords.get(curKey).getData.combineAndGetUpdateValue(historyAvroRecord,
> tableAvroSchema)
> }
> {code}
> However the similar logic cannot be found in
> `RealtimeCompactedRecordReader's`, it just assumes the record in delta log is
> the latest.
>
> cc [~garyli1019] since you wrote the `HoodieMergeOnReadRDD` code. It would be
> wonderful to merge base and delta record in the
> `RealtimeCompactedRecordReader`
>
> Also cc [~wayblink].
--
This message was sent by Atlassian Jira
(v8.3.4#803005)