[
https://issues.apache.org/jira/browse/HUDI-152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17345808#comment-17345808
]
Nishith Agarwal commented on HUDI-152:
--------------------------------------
The parquet + log records are combined in
`HoodieRealtimeCompactedRecordReader`.
Here, the following code is invoked
{code:java}
if (!deltaRecordMap.isEmpty()) {
// TODO(VC): Right now, we assume all records in log, have a matching base
record. (which
// would be true until we have a way to index logs too)
// return from delta records map if we have some match.
String key =
arrayWritable.get()[HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS].toString();
if (deltaRecordMap.containsKey(key)) {
// TODO(NA): Invoke preCombine here by converting arrayWritable to Avro.
This is required since the
// deltaRecord may not be a full record and needs values of columns from
the parquet
Option<GenericRecord> rec;
rec = buildGenericRecordwithCustomPayload(deltaRecordMap.get(key)); <-----
(1) latest record is retrieved from log files
// If the record is not present, this is a delete record using an empty
payload so skip this base record
// and move to the next record
while (!rec.isPresent()) {
// if current parquet reader has no record, return false
if (!this.parquetReader.next(aVoid, arrayWritable)) {
return false;
}
String tempKey =
arrayWritable.get()[HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS].toString();
if (deltaRecordMap.containsKey(tempKey)) {
rec = buildGenericRecordwithCustomPayload(deltaRecordMap.get(tempKey));
} else {
// need to return true, since now log file does not contain tempKey,
but parquet file contains tempKey
return true;
}
}
Later
Writable[] originalValue = arrayWritable.get();
try {
// Sometime originalValue.length > replaceValue.length.
// This can happen when hive query is looking for pseudo parquet columns like
BLOCK_OFFSET_INSIDE_FILE
System.arraycopy(replaceValue, 0, originalValue, 0,
Math.min(originalValue.length, replaceValue.length)); <---- (2) We simply
copy the data from the record from log to the original record from parquet
arrayWritable.set(originalValue);
}{code}
Before calling (2), we should convert the record from parquet which is
arrayWritable.get() to GenericRecord and then call preCombine() to honor the
preCombine() between 2 record merging.
[~shivnarayan] FYI
> Invoke preCombine in real time view by converting arrayWritable to Avro
> -----------------------------------------------------------------------
>
> Key: HUDI-152
> URL: https://issues.apache.org/jira/browse/HUDI-152
> Project: Apache Hudi
> Issue Type: Bug
> Components: Hive Integration
> Reporter: Nishith Agarwal
> Assignee: Nishith Agarwal
> Priority: Major
> Labels: sev:critical, user-support-issues
>
> Delta records (updates) might not have the entire row change log, in such an
> update, we need to be able to call preCombine of the HoodieRecordPayload
> implementation so that we merge existing data from parquet (full row change
> log) with the new column being updated.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)