xiarixiaoyao commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r725746607
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
##########
@@ -102,97 +102,107 @@ private HoodieMergedLogRecordScanner
getMergedLogRecordScanner() throws IOExcept
@Override
public boolean next(NullWritable aVoid, ArrayWritable arrayWritable) throws
IOException {
+ // deal with DeltaOnlySplits
+ if (logReader.isPresent()) {
+ return logReader.get().next(aVoid, arrayWritable);
+ }
// Call the underlying parquetReader.next - which may replace the passed
in ArrayWritable
// with a new block of values
- while (this.parquetReader.next(aVoid, arrayWritable)) {
- if (!deltaRecordMap.isEmpty()) {
- String key = arrayWritable.get()[recordKeyIndex].toString();
- if (deltaRecordMap.containsKey(key)) {
- // mark the key as handled
- this.deltaRecordKeys.remove(key);
- // TODO(NA): Invoke preCombine here by converting arrayWritable to
Avro. This is required since the
- // deltaRecord may not be a full record and needs values of columns
from the parquet
- Option<GenericRecord> rec =
buildGenericRecordwithCustomPayload(deltaRecordMap.get(key));
- // If the record is not present, this is a delete record using an
empty payload so skip this base record
- // and move to the next record
- if (!rec.isPresent()) {
- continue;
+ boolean result = this.parquetReader.next(aVoid, arrayWritable);
+ if (!result) {
+ // if the result is false, then there are no more records
+ return false;
Review comment:
same comments as
https://github.com/apache/hudi/pull/3203#discussion_r725746415
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]