This is an automated email from the ASF dual-hosted git repository.
yangjie01 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 35d00618d92e [SPARK-45814][CONNECT][SQL] Make
ArrowConverters.createEmptyArrowBatch call close() to avoid memory leak
35d00618d92e is described below
commit 35d00618d92e855d7b0bd2551b48309d07f4d180
Author: xieshuaihu <[email protected]>
AuthorDate: Thu Nov 9 15:56:40 2023 +0800
[SPARK-45814][CONNECT][SQL] Make ArrowConverters.createEmptyArrowBatch call
close() to avoid memory leak
### What changes were proposed in this pull request?
Make `ArrowBatchIterator` implement `AutoCloseable` and
`ArrowConverters.createEmptyArrowBatch()` call close() to avoid memory leak.
### Why are the changes needed?
`ArrowConverters.createEmptyArrowBatch` don't call `super.hasNext`, if
`TaskContext.get` returns `None`, then memory allocated in `ArrowBatchIterator`
is leaked.
In spark connect, `createEmptyArrowBatch` is called in
[SparkConnectPlanner](https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L2558)
and
[SparkConnectPlanExecution](https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala#L224),
which cause a long running driver consume all off-heap memory sp [...]
This is the exception stack:
```
org.apache.arrow.memory.OutOfMemoryException: Failure allocating buffer.
at
io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:67)
at
org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:77)
at
org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:84)
at
org.apache.arrow.memory.NettyAllocationManager$1.create(NettyAllocationManager.java:34)
at
org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:354)
at
org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:349)
at
org.apache.arrow.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:337)
at
org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:315)
at
org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:279)
at
org.apache.arrow.vector.BaseValueVector.allocFixedDataAndValidityBufs(BaseValueVector.java:192)
at
org.apache.arrow.vector.BaseFixedWidthVector.allocateBytes(BaseFixedWidthVector.java:338)
at
org.apache.arrow.vector.BaseFixedWidthVector.allocateNew(BaseFixedWidthVector.java:308)
at
org.apache.arrow.vector.BaseFixedWidthVector.allocateNew(BaseFixedWidthVector.java:273)
at
org.apache.spark.sql.execution.arrow.ArrowWriter$.$anonfun$create$1(ArrowWriter.scala:44)
at
scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100)
at
scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87)
at
scala.collection.convert.JavaCollectionWrappers$JListWrapper.map(JavaCollectionWrappers.scala:103)
at
org.apache.spark.sql.execution.arrow.ArrowWriter$.create(ArrowWriter.scala:43)
at
org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchIterator.<init>(ArrowConverters.scala:93)
at
org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchWithSchemaIterator.<init>(ArrowConverters.scala:138)
at
org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.<init>(ArrowConverters.scala:231)
at
org.apache.spark.sql.execution.arrow.ArrowConverters$.createEmptyArrowBatch(ArrowConverters.scala:229)
at
org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleSqlCommand(SparkConnectPlanner.scala:2481)
at
org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2426)
at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:202)
at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:158)
at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:132)
at
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:189)
at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:189)
at
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
at
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withContextClassLoader$1(SessionHolder.scala:176)
at
org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:178)
at
org.apache.spark.sql.connect.service.SessionHolder.withContextClassLoader(SessionHolder.scala:175)
at
org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:188)
at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:132)
at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:84)
at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:228)
Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to
allocate 4194304 byte(s) of direct memory (used: 1069547799, max: 1073741824)
at
io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:845)
at
io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:774)
at
io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:721)
at
io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:696)
at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:215)
at io.netty.buffer.PoolArena.tcacheAllocateSmall(PoolArena.java:180)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:137)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:129)
at
io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.newDirectBufferL(PooledByteBufAllocatorL.java:181)
at
io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.directBuffer(PooledByteBufAllocatorL.java:214)
at
io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:58)
... 37 more
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manually test
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43691 from xieshuaihu/spark-45814.
Authored-by: xieshuaihu <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
(cherry picked from commit c128f811820e5a31ddd5bd1c95ed8dd49017eaea)
Signed-off-by: yangjie01 <[email protected]>
---
.../sql/execution/arrow/ArrowConverters.scala | 25 ++++++++++++++++------
1 file changed, 18 insertions(+), 7 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
index 86dd7984b585..a843582e9c2c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
@@ -80,7 +80,7 @@ private[sql] object ArrowConverters extends Logging {
maxRecordsPerBatch: Long,
timeZoneId: String,
errorOnDuplicatedFieldNames: Boolean,
- context: TaskContext) extends Iterator[Array[Byte]] {
+ context: TaskContext) extends Iterator[Array[Byte]] with AutoCloseable {
protected val arrowSchema =
ArrowUtils.toArrowSchema(schema, timeZoneId, errorOnDuplicatedFieldNames)
@@ -93,13 +93,11 @@ private[sql] object ArrowConverters extends Logging {
protected val arrowWriter = ArrowWriter.create(root)
Option(context).foreach {_.addTaskCompletionListener[Unit] { _ =>
- root.close()
- allocator.close()
+ close()
}}
override def hasNext: Boolean = rowIter.hasNext || {
- root.close()
- allocator.close()
+ close()
false
}
@@ -124,6 +122,11 @@ private[sql] object ArrowConverters extends Logging {
out.toByteArray
}
+
+ override def close(): Unit = {
+ root.close()
+ allocator.close()
+ }
}
private[sql] class ArrowBatchWithSchemaIterator(
@@ -226,11 +229,19 @@ private[sql] object ArrowConverters extends Logging {
schema: StructType,
timeZoneId: String,
errorOnDuplicatedFieldNames: Boolean): Array[Byte] = {
- new ArrowBatchWithSchemaIterator(
+ val batches = new ArrowBatchWithSchemaIterator(
Iterator.empty, schema, 0L, 0L,
timeZoneId, errorOnDuplicatedFieldNames, TaskContext.get) {
override def hasNext: Boolean = true
- }.next()
+ }
+ Utils.tryWithSafeFinally {
+ batches.next()
+ } {
+ // If taskContext is null, `batches.close()` should be called to avoid
memory leak.
+ if (TaskContext.get() == null) {
+ batches.close()
+ }
+ }
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]