This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 9440590a909 [SPARK-38948][TESTS] Fix `DiskRowQueue` leak in
`PythonForeachWriterSuite`
9440590a909 is described below
commit 9440590a909d9222db838426c8e528ddec90e196
Author: yangjie01 <[email protected]>
AuthorDate: Mon Apr 25 09:53:02 2022 +0900
[SPARK-38948][TESTS] Fix `DiskRowQueue` leak in `PythonForeachWriterSuite`
### What changes were proposed in this pull request?
This pr add `try-finally` for `run` method of `BufferTester.thread` and
call `buffer.close()` in the `finally` block to ensure the resources held by
`BufferTester.buffer` are released.
Before this pr, there will be an `DiskRowQueue` resource hold by
`BufferTester.buffer` not closed.
### Why are the changes needed?
Minor fix of UT.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA.
Closes #36261 from LuciferYang/SPARK-38948.
Authored-by: yangjie01 <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../sql/execution/python/PythonForeachWriterSuite.scala | 12 ++++++++----
1 file changed, 8 insertions(+), 4 deletions(-)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala
index 61c9782bd17..02d6ff87f89 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala
@@ -102,11 +102,15 @@ class PythonForeachWriterSuite extends SparkFunSuite with
Eventually with Mockit
private val intProj = UnsafeProjection.create(Array[DataType](IntegerType))
private val thread = new Thread() {
override def run(): Unit = {
- while (iterator.hasNext) {
- outputBuffer.synchronized {
- outputBuffer += iterator.next().getInt(0)
+ try {
+ while (iterator.hasNext) {
+ outputBuffer.synchronized {
+ outputBuffer += iterator.next().getInt(0)
+ }
+ Thread.sleep(sleepPerRowReadMs)
}
- Thread.sleep(sleepPerRowReadMs)
+ } finally {
+ buffer.close()
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]