This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 259ac250017 [SPARK-45814][CONNECT][SQL][3.4] Make ArrowConverters.createEmptyArrowBatch call close() to avoid memory leak 259ac250017 is described below commit 259ac250017bcc1805f6cb44a5e7eedf9e552a98 Author: xieshuaihu <xieshua...@agora.io> AuthorDate: Fri Nov 10 12:33:24 2023 +0800 [SPARK-45814][CONNECT][SQL][3.4] Make ArrowConverters.createEmptyArrowBatch call close() to avoid memory leak ### What changes were proposed in this pull request? Make `ArrowBatchIterator` implement `AutoCloseable` and `ArrowConverters.createEmptyArrowBatch()` call close() to avoid memory leak. ### Why are the changes needed? `ArrowConverters.createEmptyArrowBatch` don't call `super.hasNext`, if `TaskContext.get` returns `None`, then memory allocated in `ArrowBatchIterator` is leaked. In spark connect, `createEmptyArrowBatch` is called in [SparkConnectPlanner](https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L2558) and [SparkConnectPlanExecution](https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala#L224), which cause a long running driver consume all off-heap memory sp [...] This is the exception stack: ``` org.apache.arrow.memory.OutOfMemoryException: Failure allocating buffer. at io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:67) at org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:77) at org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:84) at org.apache.arrow.memory.NettyAllocationManager$1.create(NettyAllocationManager.java:34) at org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:354) at org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:349) at org.apache.arrow.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:337) at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:315) at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:279) at org.apache.arrow.vector.BaseValueVector.allocFixedDataAndValidityBufs(BaseValueVector.java:192) at org.apache.arrow.vector.BaseFixedWidthVector.allocateBytes(BaseFixedWidthVector.java:338) at org.apache.arrow.vector.BaseFixedWidthVector.allocateNew(BaseFixedWidthVector.java:308) at org.apache.arrow.vector.BaseFixedWidthVector.allocateNew(BaseFixedWidthVector.java:273) at org.apache.spark.sql.execution.arrow.ArrowWriter$.$anonfun$create$1(ArrowWriter.scala:44) at scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100) at scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87) at scala.collection.convert.JavaCollectionWrappers$JListWrapper.map(JavaCollectionWrappers.scala:103) at org.apache.spark.sql.execution.arrow.ArrowWriter$.create(ArrowWriter.scala:43) at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchIterator.<init>(ArrowConverters.scala:93) at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchWithSchemaIterator.<init>(ArrowConverters.scala:138) at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.<init>(ArrowConverters.scala:231) at org.apache.spark.sql.execution.arrow.ArrowConverters$.createEmptyArrowBatch(ArrowConverters.scala:229) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleSqlCommand(SparkConnectPlanner.scala:2481) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2426) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:202) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:158) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:132) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:189) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:189) at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withContextClassLoader$1(SessionHolder.scala:176) at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:178) at org.apache.spark.sql.connect.service.SessionHolder.withContextClassLoader(SessionHolder.scala:175) at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:188) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:132) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:84) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:228) Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 4194304 byte(s) of direct memory (used: 1069547799, max: 1073741824) at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:845) at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:774) at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:721) at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:696) at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:215) at io.netty.buffer.PoolArena.tcacheAllocateSmall(PoolArena.java:180) at io.netty.buffer.PoolArena.allocate(PoolArena.java:137) at io.netty.buffer.PoolArena.allocate(PoolArena.java:129) at io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.newDirectBufferL(PooledByteBufAllocatorL.java:181) at io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.directBuffer(PooledByteBufAllocatorL.java:214) at io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:58) ... 37 more ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually test ### Was this patch authored or co-authored using generative AI tooling? No Closes #43728 from xieshuaihu/3_4_SPARK-45814. Authored-by: xieshuaihu <xieshua...@agora.io> Signed-off-by: yangjie01 <yangji...@baidu.com> --- .../sql/execution/arrow/ArrowConverters.scala | 25 ++++++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala index df26a06c86d..aa018efc1f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala @@ -77,7 +77,7 @@ private[sql] object ArrowConverters extends Logging { schema: StructType, maxRecordsPerBatch: Long, timeZoneId: String, - context: TaskContext) extends Iterator[Array[Byte]] { + context: TaskContext) extends Iterator[Array[Byte]] with AutoCloseable { protected val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId) private val allocator = @@ -89,13 +89,11 @@ private[sql] object ArrowConverters extends Logging { protected val arrowWriter = ArrowWriter.create(root) Option(context).foreach {_.addTaskCompletionListener[Unit] { _ => - root.close() - allocator.close() + close() }} override def hasNext: Boolean = rowIter.hasNext || { - root.close() - allocator.close() + close() false } @@ -120,6 +118,11 @@ private[sql] object ArrowConverters extends Logging { out.toByteArray } + + override def close(): Unit = { + root.close() + allocator.close() + } } private[sql] class ArrowBatchWithSchemaIterator( @@ -217,10 +220,18 @@ private[sql] object ArrowConverters extends Logging { private[sql] def createEmptyArrowBatch( schema: StructType, timeZoneId: String): Array[Byte] = { - new ArrowBatchWithSchemaIterator( + val batches = new ArrowBatchWithSchemaIterator( Iterator.empty, schema, 0L, 0L, timeZoneId, TaskContext.get) { override def hasNext: Boolean = true - }.next() + } + Utils.tryWithSafeFinally { + batches.next() + } { + // If taskContext is null, `batches.close()` should be called to avoid memory leak. + if (TaskContext.get() == null) { + batches.close() + } + } } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org