This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a53415d9 [KYUUBI #6469] Lazily initialize RecordReaderIterator to 
avoid driver oom when fetching big result set
0a53415d9 is described below

commit 0a53415d929dfe477f9c31f707e559c09dd175a0
Author: 吴梓溢 <[email protected]>
AuthorDate: Thu Jun 20 11:06:20 2024 +0800

    [KYUUBI #6469] Lazily initialize RecordReaderIterator to avoid driver oom 
when fetching big result set
    
    # :mag: Description
    ## Issue References 🔗
    
    This pull request fixes https://github.com/apache/kyuubi/issues/6469
    
    ## Describe Your Solution 🔧
    
    Instead of initializing all RecordReaderIterator when create 
OrcFileIterator,we can lazily initialize the RecordReaderIterator to make sure 
that there is only one RecordReaderIterator which reads file current fetching 
by client in driver memory.
    
    ## Types of changes :bookmark:
    
    - [x] Bugfix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan 🧪
    
    #### Behavior Without This Pull Request :coffin:
    
    #### Behavior With This Pull Request :tada:
    
    #### Related Unit Tests
    
    ---
    
    # Checklist 📝
    
    - [x] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    Closes #6470 from Z1Wu/bugfix-fetch-big-resultset-lazily.
    
    Closes #6469
    
    83208018c [吴梓溢] update
    56284e68e [吴梓溢] update
    
    Authored-by: 吴梓溢 <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../engine/spark/operation/FetchOrcStatement.scala | 30 +++++++++++++---------
 1 file changed, 18 insertions(+), 12 deletions(-)

diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala
index 75a6c70b5..1059eaa3b 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala
@@ -76,23 +76,29 @@ class FetchOrcStatement(spark: SparkSession) {
 
 class OrcFileIterator(fileList: ListBuffer[LocatedFileStatus]) extends 
Iterator[OrcStruct] {
 
-  private val iters = fileList.map(x => getOrcFileIterator(x))
+  private var idx = 0
+  private var curIter = getNextIter
 
-  var idx = 0
+  private def getNextIter: Option[RecordReaderIterator[OrcStruct]] = {
+    if (idx >= fileList.size) return None
+    val resIter = getOrcFileIterator(fileList(idx))
+    idx = idx + 1
+    Some(resIter)
+  }
 
   override def hasNext: Boolean = {
-    if (idx >= iters.size) return false
-    val hasNext = iters(idx).hasNext
+    if (curIter.isEmpty) return false
+    val hasNext = curIter.get.hasNext
     if (!hasNext) {
-      iters(idx).close()
-      idx += 1
+      curIter.get.close()
+      curIter = getNextIter
       // skip empty file
-      while (idx < iters.size) {
-        if (iters(idx).hasNext) {
+      while (curIter.isDefined) {
+        if (curIter.get.hasNext) {
           return true
         } else {
-          iters(idx).close()
-          idx = idx + 1
+          curIter.get.close()
+          curIter = getNextIter
         }
       }
     }
@@ -100,11 +106,11 @@ class OrcFileIterator(fileList: 
ListBuffer[LocatedFileStatus]) extends Iterator[
   }
 
   override def next(): OrcStruct = {
-    iters(idx).next()
+    curIter.get.next()
   }
 
   def close(): Unit = {
-    iters.foreach(_.close())
+    curIter.foreach(_.close())
   }
 
   private def getOrcFileIterator(file: LocatedFileStatus): 
RecordReaderIterator[OrcStruct] = {

Reply via email to