This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 7ab37ebd simplify code in CometExecIterator and avoid some small
overhead (#522)
7ab37ebd is described below
commit 7ab37ebdac1e077e157e292fbddf7ca23e78429c
Author: Andy Grove <[email protected]>
AuthorDate: Wed Jun 5 15:31:16 2024 -0600
simplify code in CometExecIterator and avoid some small overhead (#522)
---
.../scala/org/apache/comet/CometExecIterator.scala | 41 ++++++++--------------
1 file changed, 14 insertions(+), 27 deletions(-)
diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
index b3604c9e..89225c0d 100644
--- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
+++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
@@ -67,20 +67,6 @@ class CometExecIterator(
private var currentBatch: ColumnarBatch = null
private var closed: Boolean = false
- private def executeNative(): ExecutionState = {
- val result = nativeLib.executePlan(plan)
-
- val flag = result(0)
- if (flag == -1) EOF
- else if (flag == 1) {
- val numRows = result(1)
- val addresses = result.slice(2, result.length)
- Batch(numRows = numRows.toInt, addresses = addresses)
- } else {
- throw new IllegalStateException(s"Invalid native flag: $flag")
- }
- }
-
/**
* Creates a new configuration map to be passed to the native side.
*/
@@ -110,21 +96,22 @@ class CometExecIterator(
result
}
- /** Execution result from Comet native */
- trait ExecutionState
-
- /** A new batch is available */
- case class Batch(numRows: Int, addresses: Array[Long]) extends ExecutionState
-
- /** The execution is finished - no more batch */
- case object EOF extends ExecutionState
-
def getNextBatch(): Option[ColumnarBatch] = {
- executeNative() match {
- case EOF => None
- case Batch(numRows, addresses) =>
+ // we execute the native plan each time we need another output batch and
this could
+ // result in multiple input batches being processed
+ val result = nativeLib.executePlan(plan)
+
+ result(0) match {
+ case -1 =>
+ // EOF
+ None
+ case 1 =>
+ val numRows = result(1)
+ val addresses = result.slice(2, result.length)
val cometVectors = nativeUtil.importVector(addresses)
- Some(new ColumnarBatch(cometVectors.toArray, numRows))
+ Some(new ColumnarBatch(cometVectors.toArray, numRows.toInt))
+ case flag =>
+ throw new IllegalStateException(s"Invalid native flag: $flag")
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]