[ 
https://issues.apache.org/jira/browse/HUDI-3723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated HUDI-3723:
---------------------------------
    Labels: pull-request-available  (was: )

> MOR MergeOnRead FitleringIterator stackoverflow error
> -----------------------------------------------------
>
>                 Key: HUDI-3723
>                 URL: https://issues.apache.org/jira/browse/HUDI-3723
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: sivabalan narayanan
>            Assignee: Alexey Kudinkin
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 0.11.0
>
>
> We run integration tests against hudi at regular cadence and recently we are 
> seeing stackoverflow error w/ MOR table for spark long running yaml. 
>  
> {code:java}
> 22/03/26 14:27:04 INFO ValidateDatasetNode: Validate data in target hudi path 
> basaePath/*/*/*
> 22/03/26 14:28:51 ERROR Executor: Exception in task 2.0 in stage 975.0 (TID 
> 17933)
> java.lang.StackOverflowError
>         at java.util.HashMap.removeNode(HashMap.java:821)
>         at java.util.HashMap.remove(HashMap.java:800)
>         at 
> org.apache.hudi.common.util.collection.ExternalSpillableMap.remove(ExternalSpillableMap.java:238)
>         at 
> org.apache.hudi.common.util.collection.ExternalSpillableMap.remove(ExternalSpillableMap.java:55)
>         at 
> scala.collection.convert.Wrappers$JMapWrapperLike.remove(Wrappers.scala:296)
>         at 
> scala.collection.convert.Wrappers$JMapWrapperLike.remove$(Wrappers.scala:296)
>         at 
> scala.collection.convert.Wrappers$JMapWrapper.remove(Wrappers.scala:317)
>         at 
> org.apache.hudi.HoodieMergeOnReadRDD$LogFileIterator.removeLogRecord(HoodieMergeOnReadRDD.scala:187)
>         at 
> org.apache.hudi.HoodieMergeOnReadRDD$RecordMergingFileIterator.hasNext(HoodieMergeOnReadRDD.scala:262)
>         at 
> org.apache.hudi.HoodieMergeOnReadRDD$RecordMergingFileIterator.hasNext(HoodieMergeOnReadRDD.scala:271)
>         at 
> org.apache.hudi.HoodieMergeOnReadRDD$RecordMergingFileIterator.hasNext(HoodieMergeOnReadRDD.scala:271)
>         at 
> org.apache.hudi.HoodieMergeOnReadRDD$RecordMergingFileIterator.hasNext(HoodieMergeOnReadRDD.scala:271)
>         at 
> org.apache.hudi.HoodieMergeOnReadRDD$RecordMergingFileIterator.hasNext(HoodieMergeOnReadRDD.scala:271)
>         at 
> org.apache.hudi.HoodieMergeOnReadRDD$RecordMergingFileIterator.hasNext(HoodieMergeOnReadRDD.scala:271)
>         at 
> org.apache.hudi.HoodieMergeOnReadRDD$RecordMergingFileIterator.hasNext(HoodieMergeOnReadRDD.scala:271)
>  
> .
> .
> .
> .{code}
> this repeats for some time and the jobs fails eventually. 
>  
> Likely the root cause is, in our iterator, if we encounter a delete record, 
> we call hasNext() so that we skip current one and go to next. But this 
> creates a call function in stack and so if this repeats for 8k or more times 
> and if stack size in lesser in the corresponding jvm, our test will fail. In 
> reality, there could be million delete records too. so, we need to find a way 
> to fix this. For now, we are experimenting around "-Xss100m" java option 
> temporarily to increase the stack size in the jvm. 
>  
> Code snippet from HoodieMORRDD
> especially the line 
> ```
> if (mergedAvroRecordOpt.isEmpty) { // Record has been deleted, skipping 
> this.hasNext
> ``` 
> in below snippet. 
>  
> {code:java}
> override def hasNext: Boolean = {
>   if (baseFileIterator.hasNext) {
>     val curRowRecord = baseFileIterator.next()
>     val curKey = curRowRecord.getString(recordKeyOrdinal)
>     val updatedRecordOpt = removeLogRecord(curKey)
>     if (updatedRecordOpt.isEmpty) {
>       // No merge needed, load current row with required projected schema
>       recordToLoad = unsafeProjection(projectRowUnsafe(curRowRecord, 
> requiredSchema.structTypeSchema, requiredSchemaFieldOrdinals))
>       true
>     } else {
>       val mergedAvroRecordOpt = merge(serialize(curRowRecord), 
> updatedRecordOpt.get)
>       if (mergedAvroRecordOpt.isEmpty) {
>         // Record has been deleted, skipping
>         this.hasNext
>       } else {
>         // NOTE: In occurrence of a merge we can't know the schema of the 
> record being returned, b/c
>         //       record from the Delta Log will bear (full) Table schema, 
> while record from the Base file
>         //       might already be read in projected one (as an optimization).
>         //       As such we can't use more performant [[projectAvroUnsafe]], 
> and instead have to fallback
>         //       to [[projectAvro]]
>         val projectedAvroRecord = projectAvro(mergedAvroRecordOpt.get, 
> requiredAvroSchema, recordBuilder)
>         recordToLoad = unsafeProjection(deserialize(projectedAvroRecord))
>         true
>       }
>     }
>   } else {
>     super[LogFileIterator].hasNext
>   } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to