vinothchandar commented on a change in pull request #2190:
URL: https://github.com/apache/hudi/pull/2190#discussion_r511530095
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
##########
@@ -85,53 +85,55 @@ public boolean next(NullWritable aVoid, ArrayWritable
arrayWritable) throws IOEx
// if the result is false, then there are no more records
return false;
} else {
- // TODO(VC): Right now, we assume all records in log, have a matching
base record. (which
- // would be true until we have a way to index logs too)
- // return from delta records map if we have some match.
- String key =
arrayWritable.get()[HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS].toString();
- if (deltaRecordMap.containsKey(key)) {
- // TODO(NA): Invoke preCombine here by converting arrayWritable to
Avro. This is required since the
- // deltaRecord may not be a full record and needs values of columns
from the parquet
- Option<GenericRecord> rec;
- if (usesCustomPayload) {
- rec =
deltaRecordMap.get(key).getData().getInsertValue(getWriterSchema());
- } else {
- rec =
deltaRecordMap.get(key).getData().getInsertValue(getReaderSchema());
- }
- if (!rec.isPresent()) {
- // If the record is not present, this is a delete record using an
empty payload so skip this base record
- // and move to the next record
- return next(aVoid, arrayWritable);
- }
- GenericRecord recordToReturn = rec.get();
- if (usesCustomPayload) {
- // If using a custom payload, return only the projection fields. The
readerSchema is a schema derived from
- // the writerSchema with only the projection fields
- recordToReturn =
HoodieAvroUtils.rewriteRecordWithOnlyNewSchemaFields(rec.get(),
getReaderSchema());
- }
- // we assume, a later safe record in the log, is newer than what we
have in the map &
- // replace it. Since we want to return an arrayWritable which is the
same length as the elements in the latest
- // schema, we use writerSchema to create the arrayWritable from the
latest generic record
- ArrayWritable aWritable = (ArrayWritable)
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(recordToReturn,
getHiveSchema());
- Writable[] replaceValue = aWritable.get();
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("key %s, base values: %s, log values: %s",
key, HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable),
-
HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable)));
- }
- Writable[] originalValue = arrayWritable.get();
- try {
- // Sometime originalValue.length > replaceValue.length.
- // This can happen when hive query is looking for pseudo parquet
columns like BLOCK_OFFSET_INSIDE_FILE
- System.arraycopy(replaceValue, 0, originalValue, 0,
- Math.min(originalValue.length, replaceValue.length));
- arrayWritable.set(originalValue);
- } catch (RuntimeException re) {
- LOG.error("Got exception when doing array copy", re);
- LOG.error("Base record :" +
HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable));
- LOG.error("Log record :" +
HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable));
- String errMsg = "Base-record :" +
HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable)
- + " ,Log-record :" +
HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable) + " ,Error :"
+ re.getMessage();
- throw new RuntimeException(errMsg, re);
+ if (!deltaRecordMap.isEmpty()) {
Review comment:
I think we can structure this as a if block without need for the else?
since the if above anyway returns out.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]