This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8baeb816d5 [HUDI-3723] Fixed stack overflows in Record Iterators 
(#5235)
8baeb816d5 is described below

commit 8baeb816d553f202c7bd4b6564d6955be46d74b5
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Tue Apr 5 20:12:13 2022 -0700

    [HUDI-3723] Fixed stack overflows in Record Iterators (#5235)
---
 .../scala/org/apache/hudi/HoodieMergeOnReadRDD.scala | 20 ++++++++++++++++----
 1 file changed, 16 insertions(+), 4 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
index 05c98e3aeb..d40cf49434 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
@@ -50,6 +50,7 @@ import org.apache.spark.{Partition, SerializableWritable, 
SparkContext, TaskCont
 
 import java.io.Closeable
 import java.util.Properties
+import scala.annotation.tailrec
 import scala.collection.JavaConverters._
 import scala.util.Try
 
@@ -188,17 +189,23 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
     protected def removeLogRecord(key: String): Option[HoodieRecord[_ <: 
HoodieRecordPayload[_]]] =
       logRecords.remove(key)
 
-    override def hasNext: Boolean =
+    override def hasNext: Boolean = hasNextInternal
+
+    // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to 
make sure
+    //       that recursion is unfolded into a loop to avoid stack overflows 
while
+    //       handling records
+    @tailrec private def hasNextInternal: Boolean = {
       logRecordsIterator.hasNext && {
         val avroRecordOpt = logRecordsIterator.next()
         if (avroRecordOpt.isEmpty) {
           // Record has been deleted, skipping
-          this.hasNext
+          this.hasNextInternal
         } else {
           recordToLoad = unsafeProjection(deserialize(avroRecordOpt.get))
           true
         }
       }
+    }
 
     override final def next(): InternalRow = recordToLoad
 
@@ -257,7 +264,12 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
 
     private val recordKeyOrdinal = 
baseFileReaderSchema.structTypeSchema.fieldIndex(tableState.recordKeyField)
 
-    override def hasNext: Boolean = {
+    override def hasNext: Boolean = hasNextInternal
+
+    // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to 
make sure
+    //       that recursion is unfolded into a loop to avoid stack overflows 
while
+    //       handling records
+    @tailrec private def hasNextInternal: Boolean = {
       if (baseFileIterator.hasNext) {
         val curRowRecord = baseFileIterator.next()
         val curKey = curRowRecord.getString(recordKeyOrdinal)
@@ -270,7 +282,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
           val mergedAvroRecordOpt = merge(serialize(curRowRecord), 
updatedRecordOpt.get)
           if (mergedAvroRecordOpt.isEmpty) {
             // Record has been deleted, skipping
-            this.hasNext
+            this.hasNextInternal
           } else {
             // NOTE: In occurrence of a merge we can't know the schema of the 
record being returned, b/c
             //       record from the Delta Log will bear (full) Table schema, 
while record from the Base file

Reply via email to