sunchao commented on code in PR #211:
URL: 
https://github.com/apache/arrow-datafusion-comet/pull/211#discussion_r1535130870


##########
spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala:
##########
@@ -257,9 +257,28 @@ class CometBatchRDD(
   }
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[ColumnarBatch] = {
-    val partition = split.asInstanceOf[CometBatchPartition]
+    new Iterator[ColumnarBatch] {
+      val partition = split.asInstanceOf[CometBatchPartition]
+      val batchesIter = 
partition.value.value.map(CometExec.decodeBatches(_)).toIterator
+      var iter: Iterator[ColumnarBatch] = null
+
+      override def hasNext: Boolean = {
+        if (iter != null) {
+          if (iter.hasNext) {
+            return true
+          }
+        }
+        if (batchesIter.hasNext) {
+          iter = batchesIter.next()

Review Comment:
   is it possible that this `batchesIter.next` has no more next but the next 
item in `batchesIter` still has more elements? e.g.: `[1, 2, 3], [], [4, 5, 6]`?



##########
spark/src/main/scala/org/apache/spark/sql/comet/operators.scala:
##########
@@ -644,10 +681,11 @@ case class CometSortMergeJoinExec(
 
   override def equals(obj: Any): Boolean = {
     obj match {
-      case other: CometSortMergeJoinExec =>
+      case other: CometBroadcastHashJoinExec =>
         this.leftKeys == other.leftKeys &&
         this.rightKeys == other.rightKeys &&
         this.condition == other.condition &&
+        this.buildSide == other.buildSide &&

Review Comment:
   also take `buildSide` into account for `hash`?



##########
spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala:
##########
@@ -257,9 +257,28 @@ class CometBatchRDD(
   }
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[ColumnarBatch] = {
-    val partition = split.asInstanceOf[CometBatchPartition]
+    new Iterator[ColumnarBatch] {
+      val partition = split.asInstanceOf[CometBatchPartition]
+      val batchesIter = 
partition.value.value.map(CometExec.decodeBatches(_)).toIterator
+      var iter: Iterator[ColumnarBatch] = null
+
+      override def hasNext: Boolean = {
+        if (iter != null) {
+          if (iter.hasNext) {

Review Comment:
   nit: can just combine the two `if`s



##########
common/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/ArrowReaderIterator.scala:
##########
@@ -36,6 +36,13 @@ class ArrowReaderIterator(channel: ReadableByteChannel) 
extends Iterator[Columna
       return true
     }
 
+    // Release the previous batch.
+    // If it is not released, when closing the reader, arrow library will 
complain about
+    // memory leak.
+    if (currentBatch != null) {
+      currentBatch.close()
+    }
+

Review Comment:
   Is this related to the memory leak we saw?



##########
spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala:
##########
@@ -257,9 +257,28 @@ class CometBatchRDD(
   }
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[ColumnarBatch] = {
-    val partition = split.asInstanceOf[CometBatchPartition]
+    new Iterator[ColumnarBatch] {
+      val partition = split.asInstanceOf[CometBatchPartition]
+      val batchesIter = 
partition.value.value.map(CometExec.decodeBatches(_)).toIterator

Review Comment:
   nit: I think `(_)` is not necessary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to