[ 
https://issues.apache.org/jira/browse/HUDI-152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17345808#comment-17345808
 ] 

Nishith Agarwal commented on HUDI-152:
--------------------------------------

The parquet + log records are combined in 
`HoodieRealtimeCompactedRecordReader`. 

 

Here, the following code is invoked

 
{code:java}
if (!deltaRecordMap.isEmpty()) {
  // TODO(VC): Right now, we assume all records in log, have a matching base 
record. (which
  // would be true until we have a way to index logs too)
  // return from delta records map if we have some match.
  String key = 
arrayWritable.get()[HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS].toString();
  if (deltaRecordMap.containsKey(key)) {
    // TODO(NA): Invoke preCombine here by converting arrayWritable to Avro. 
This is required since the
    // deltaRecord may not be a full record and needs values of columns from 
the parquet
    Option<GenericRecord> rec;
    rec = buildGenericRecordwithCustomPayload(deltaRecordMap.get(key)); <----- 
(1) latest record is retrieved from log files
    // If the record is not present, this is a delete record using an empty 
payload so skip this base record
    // and move to the next record
    while (!rec.isPresent()) {
      // if current parquet reader has no record, return false
      if (!this.parquetReader.next(aVoid, arrayWritable)) {
        return false;
      }
      String tempKey = 
arrayWritable.get()[HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS].toString();
      if (deltaRecordMap.containsKey(tempKey)) {
        rec = buildGenericRecordwithCustomPayload(deltaRecordMap.get(tempKey));
      } else {
        // need to return true, since now log file does not contain tempKey, 
but parquet file contains tempKey
        return true;
      }
    }

Later 

Writable[] originalValue = arrayWritable.get();
try {
  // Sometime originalValue.length > replaceValue.length.
  // This can happen when hive query is looking for pseudo parquet columns like 
BLOCK_OFFSET_INSIDE_FILE
  System.arraycopy(replaceValue, 0, originalValue, 0,
      Math.min(originalValue.length, replaceValue.length)); <---- (2) We simply 
copy the data from the record from log to the original record from parquet
  arrayWritable.set(originalValue);
}{code}
Before calling (2), we should convert the record from parquet which is 
arrayWritable.get() to GenericRecord and then call preCombine() to honor the 
preCombine() between 2 record merging.

 

[~shivnarayan] FYI

 

> Invoke preCombine in real time view by converting arrayWritable to Avro
> -----------------------------------------------------------------------
>
>                 Key: HUDI-152
>                 URL: https://issues.apache.org/jira/browse/HUDI-152
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: Hive Integration
>            Reporter: Nishith Agarwal
>            Assignee: Nishith Agarwal
>            Priority: Major
>              Labels: sev:critical, user-support-issues
>
> Delta records (updates) might not have the entire row change log, in such an 
> update, we need to be able to call preCombine of the HoodieRecordPayload 
> implementation so that we merge existing data from parquet (full row change 
> log) with the new column being updated.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to