This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.9 by this push:
new d4f46adad [KYUUBI #6469] Lazily initialize RecordReaderIterator to
avoid driver oom when fetching big result set
d4f46adad is described below
commit d4f46adad3069df5a0156b110727c978198b7ec5
Author: 吴梓溢 <[email protected]>
AuthorDate: Thu Jun 20 11:06:20 2024 +0800
[KYUUBI #6469] Lazily initialize RecordReaderIterator to avoid driver oom
when fetching big result set
# :mag: Description
## Issue References 🔗
This pull request fixes https://github.com/apache/kyuubi/issues/6469
## Describe Your Solution 🔧
Instead of initializing all RecordReaderIterator when create
OrcFileIterator,we can lazily initialize the RecordReaderIterator to make sure
that there is only one RecordReaderIterator which reads file current fetching
by client in driver memory.
## Types of changes :bookmark:
- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan 🧪
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
---
# Checklist 📝
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
Closes #6470 from Z1Wu/bugfix-fetch-big-resultset-lazily.
Closes #6469
83208018c [吴梓溢] update
56284e68e [吴梓溢] update
Authored-by: 吴梓溢 <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit 0a53415d929dfe477f9c31f707e559c09dd175a0)
Signed-off-by: Cheng Pan <[email protected]>
---
.../engine/spark/operation/FetchOrcStatement.scala | 30 +++++++++++++---------
1 file changed, 18 insertions(+), 12 deletions(-)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala
index c19017077..44a2e887a 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala
@@ -108,23 +108,29 @@ class FetchOrcStatement(spark: SparkSession) {
class OrcFileIterator(fileList: ListBuffer[LocatedFileStatus]) extends
Iterator[OrcStruct] {
- private val iters = fileList.map(x => getOrcFileIterator(x))
+ private var idx = 0
+ private var curIter = getNextIter
- var idx = 0
+ private def getNextIter: Option[RecordReaderIterator[OrcStruct]] = {
+ if (idx >= fileList.size) return None
+ val resIter = getOrcFileIterator(fileList(idx))
+ idx = idx + 1
+ Some(resIter)
+ }
override def hasNext: Boolean = {
- if (idx >= iters.size) return false
- val hasNext = iters(idx).hasNext
+ if (curIter.isEmpty) return false
+ val hasNext = curIter.get.hasNext
if (!hasNext) {
- iters(idx).close()
- idx += 1
+ curIter.get.close()
+ curIter = getNextIter
// skip empty file
- while (idx < iters.size) {
- if (iters(idx).hasNext) {
+ while (curIter.isDefined) {
+ if (curIter.get.hasNext) {
return true
} else {
- iters(idx).close()
- idx = idx + 1
+ curIter.get.close()
+ curIter = getNextIter
}
}
}
@@ -132,11 +138,11 @@ class OrcFileIterator(fileList:
ListBuffer[LocatedFileStatus]) extends Iterator[
}
override def next(): OrcStruct = {
- iters(idx).next()
+ curIter.get.next()
}
def close(): Unit = {
- iters.foreach(_.close())
+ curIter.foreach(_.close())
}
private def getOrcFileIterator(file: LocatedFileStatus):
RecordReaderIterator[OrcStruct] = {