This is an automated email from the ASF dual-hosted git repository.

garyli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 65844a8  [HUDI-1720] Fix RealtimeCompactedRecordReader 
StackOverflowError (#2721)
65844a8 is described below

commit 65844a8d29b715227f3b2ecdf9b102a08950ee67
Author: xiarixiaoyao <[email protected]>
AuthorDate: Tue Apr 13 18:23:26 2021 +0800

    [HUDI-1720] Fix RealtimeCompactedRecordReader StackOverflowError (#2721)
---
 .../realtime/RealtimeCompactedRecordReader.java    | 31 +++++++++++++++++-----
 1 file changed, 24 insertions(+), 7 deletions(-)

diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
index a98a230..1ae25f8 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -77,6 +77,14 @@ class RealtimeCompactedRecordReader extends 
AbstractRealtimeRecordReader
         .build();
   }
 
+  private Option<GenericRecord> 
buildGenericRecordwithCustomPayload(HoodieRecord record) throws IOException {
+    if (usesCustomPayload) {
+      return record.getData().getInsertValue(getWriterSchema());
+    } else {
+      return record.getData().getInsertValue(getReaderSchema());
+    }
+  }
+
   @Override
   public boolean next(NullWritable aVoid, ArrayWritable arrayWritable) throws 
IOException {
     // Call the underlying parquetReader.next - which may replace the passed 
in ArrayWritable
@@ -95,15 +103,24 @@ class RealtimeCompactedRecordReader extends 
AbstractRealtimeRecordReader
         // TODO(NA): Invoke preCombine here by converting arrayWritable to 
Avro. This is required since the
         // deltaRecord may not be a full record and needs values of columns 
from the parquet
         Option<GenericRecord> rec;
-        if (usesCustomPayload) {
-          rec = 
deltaRecordMap.get(key).getData().getInsertValue(getWriterSchema());
-        } else {
-          rec = 
deltaRecordMap.get(key).getData().getInsertValue(getReaderSchema());
+        rec = buildGenericRecordwithCustomPayload(deltaRecordMap.get(key));
+        // If the record is not present, this is a delete record using an 
empty payload so skip this base record
+        // and move to the next record
+        while (!rec.isPresent()) {
+          // if current parquet reader has no record, return false
+          if (!this.parquetReader.next(aVoid, arrayWritable)) {
+            return false;
+          }
+          String tempKey = 
arrayWritable.get()[HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS].toString();
+          if (deltaRecordMap.containsKey(tempKey)) {
+            rec = 
buildGenericRecordwithCustomPayload(deltaRecordMap.get(tempKey));
+          } else {
+            // need to return true, since now log file does not contain 
tempKey, but parquet file contains tempKey
+            return true;
+          }
         }
         if (!rec.isPresent()) {
-          // If the record is not present, this is a delete record using an 
empty payload so skip this base record
-          // and move to the next record
-          return next(aVoid, arrayWritable);
+          return false;
         }
         GenericRecord recordToReturn = rec.get();
         if (usesCustomPayload) {

Reply via email to