sunchao commented on code in PR #211:
URL:
https://github.com/apache/arrow-datafusion-comet/pull/211#discussion_r1535130870
##########
spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala:
##########
@@ -257,9 +257,28 @@ class CometBatchRDD(
}
override def compute(split: Partition, context: TaskContext):
Iterator[ColumnarBatch] = {
- val partition = split.asInstanceOf[CometBatchPartition]
+ new Iterator[ColumnarBatch] {
+ val partition = split.asInstanceOf[CometBatchPartition]
+ val batchesIter =
partition.value.value.map(CometExec.decodeBatches(_)).toIterator
+ var iter: Iterator[ColumnarBatch] = null
+
+ override def hasNext: Boolean = {
+ if (iter != null) {
+ if (iter.hasNext) {
+ return true
+ }
+ }
+ if (batchesIter.hasNext) {
+ iter = batchesIter.next()
Review Comment:
is it possible that this `batchesIter.next` has no more next but the next
item in `batchesIter` still has more elements? e.g.: `[1, 2, 3], [], [4, 5, 6]`?
##########
spark/src/main/scala/org/apache/spark/sql/comet/operators.scala:
##########
@@ -644,10 +681,11 @@ case class CometSortMergeJoinExec(
override def equals(obj: Any): Boolean = {
obj match {
- case other: CometSortMergeJoinExec =>
+ case other: CometBroadcastHashJoinExec =>
this.leftKeys == other.leftKeys &&
this.rightKeys == other.rightKeys &&
this.condition == other.condition &&
+ this.buildSide == other.buildSide &&
Review Comment:
also take `buildSide` into account for `hash`?
##########
spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala:
##########
@@ -257,9 +257,28 @@ class CometBatchRDD(
}
override def compute(split: Partition, context: TaskContext):
Iterator[ColumnarBatch] = {
- val partition = split.asInstanceOf[CometBatchPartition]
+ new Iterator[ColumnarBatch] {
+ val partition = split.asInstanceOf[CometBatchPartition]
+ val batchesIter =
partition.value.value.map(CometExec.decodeBatches(_)).toIterator
+ var iter: Iterator[ColumnarBatch] = null
+
+ override def hasNext: Boolean = {
+ if (iter != null) {
+ if (iter.hasNext) {
Review Comment:
nit: can just combine the two `if`s
##########
common/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/ArrowReaderIterator.scala:
##########
@@ -36,6 +36,13 @@ class ArrowReaderIterator(channel: ReadableByteChannel)
extends Iterator[Columna
return true
}
+ // Release the previous batch.
+ // If it is not released, when closing the reader, arrow library will
complain about
+ // memory leak.
+ if (currentBatch != null) {
+ currentBatch.close()
+ }
+
Review Comment:
Is this related to the memory leak we saw?
##########
spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala:
##########
@@ -257,9 +257,28 @@ class CometBatchRDD(
}
override def compute(split: Partition, context: TaskContext):
Iterator[ColumnarBatch] = {
- val partition = split.asInstanceOf[CometBatchPartition]
+ new Iterator[ColumnarBatch] {
+ val partition = split.asInstanceOf[CometBatchPartition]
+ val batchesIter =
partition.value.value.map(CometExec.decodeBatches(_)).toIterator
Review Comment:
nit: I think `(_)` is not necessary?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]