viirya commented on code in PR #929:
URL: https://github.com/apache/datafusion-comet/pull/929#discussion_r1750369719


##########
common/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometBlockStoreShuffleReader.scala:
##########
@@ -88,20 +88,20 @@ class CometBlockStoreShuffleReader[K, C](
 
   /** Read the combined key-values for this reduce task */
   override def read(): Iterator[Product2[K, C]] = {
+    var currentReadIterator: ArrowReaderIterator = null
+
+    // Closes last read iterator after the task is finished.
+    // We need to close read iterator during iterating input streams,
+    // instead of one callback per read iterator. Otherwise if there are too 
many
+    // read iterators, it may blow up the call stack and cause OOM.
+    context.addTaskCompletionListener[Unit] { _ =>
+      if (currentReadIterator != null) {
+        currentReadIterator.close()
+      }
+    }

Review Comment:
   Based on the issue description, this is the major fix, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to