This is an automated email from the ASF dual-hosted git repository.
garyli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 65844a8 [HUDI-1720] Fix RealtimeCompactedRecordReader
StackOverflowError (#2721)
65844a8 is described below
commit 65844a8d29b715227f3b2ecdf9b102a08950ee67
Author: xiarixiaoyao <[email protected]>
AuthorDate: Tue Apr 13 18:23:26 2021 +0800
[HUDI-1720] Fix RealtimeCompactedRecordReader StackOverflowError (#2721)
---
.../realtime/RealtimeCompactedRecordReader.java | 31 +++++++++++++++++-----
1 file changed, 24 insertions(+), 7 deletions(-)
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
index a98a230..1ae25f8 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -77,6 +77,14 @@ class RealtimeCompactedRecordReader extends
AbstractRealtimeRecordReader
.build();
}
+ private Option<GenericRecord>
buildGenericRecordwithCustomPayload(HoodieRecord record) throws IOException {
+ if (usesCustomPayload) {
+ return record.getData().getInsertValue(getWriterSchema());
+ } else {
+ return record.getData().getInsertValue(getReaderSchema());
+ }
+ }
+
@Override
public boolean next(NullWritable aVoid, ArrayWritable arrayWritable) throws
IOException {
// Call the underlying parquetReader.next - which may replace the passed
in ArrayWritable
@@ -95,15 +103,24 @@ class RealtimeCompactedRecordReader extends
AbstractRealtimeRecordReader
// TODO(NA): Invoke preCombine here by converting arrayWritable to
Avro. This is required since the
// deltaRecord may not be a full record and needs values of columns
from the parquet
Option<GenericRecord> rec;
- if (usesCustomPayload) {
- rec =
deltaRecordMap.get(key).getData().getInsertValue(getWriterSchema());
- } else {
- rec =
deltaRecordMap.get(key).getData().getInsertValue(getReaderSchema());
+ rec = buildGenericRecordwithCustomPayload(deltaRecordMap.get(key));
+ // If the record is not present, this is a delete record using an
empty payload so skip this base record
+ // and move to the next record
+ while (!rec.isPresent()) {
+ // if current parquet reader has no record, return false
+ if (!this.parquetReader.next(aVoid, arrayWritable)) {
+ return false;
+ }
+ String tempKey =
arrayWritable.get()[HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS].toString();
+ if (deltaRecordMap.containsKey(tempKey)) {
+ rec =
buildGenericRecordwithCustomPayload(deltaRecordMap.get(tempKey));
+ } else {
+ // need to return true, since now log file does not contain
tempKey, but parquet file contains tempKey
+ return true;
+ }
}
if (!rec.isPresent()) {
- // If the record is not present, this is a delete record using an
empty payload so skip this base record
- // and move to the next record
- return next(aVoid, arrayWritable);
+ return false;
}
GenericRecord recordToReturn = rec.get();
if (usesCustomPayload) {