This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 7ab37ebd simplify code in CometExecIterator and avoid some small 
overhead (#522)
7ab37ebd is described below

commit 7ab37ebdac1e077e157e292fbddf7ca23e78429c
Author: Andy Grove <[email protected]>
AuthorDate: Wed Jun 5 15:31:16 2024 -0600

    simplify code in CometExecIterator and avoid some small overhead (#522)
---
 .../scala/org/apache/comet/CometExecIterator.scala | 41 ++++++++--------------
 1 file changed, 14 insertions(+), 27 deletions(-)

diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala 
b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
index b3604c9e..89225c0d 100644
--- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
+++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
@@ -67,20 +67,6 @@ class CometExecIterator(
   private var currentBatch: ColumnarBatch = null
   private var closed: Boolean = false
 
-  private def executeNative(): ExecutionState = {
-    val result = nativeLib.executePlan(plan)
-
-    val flag = result(0)
-    if (flag == -1) EOF
-    else if (flag == 1) {
-      val numRows = result(1)
-      val addresses = result.slice(2, result.length)
-      Batch(numRows = numRows.toInt, addresses = addresses)
-    } else {
-      throw new IllegalStateException(s"Invalid native flag: $flag")
-    }
-  }
-
   /**
    * Creates a new configuration map to be passed to the native side.
    */
@@ -110,21 +96,22 @@ class CometExecIterator(
     result
   }
 
-  /** Execution result from Comet native */
-  trait ExecutionState
-
-  /** A new batch is available */
-  case class Batch(numRows: Int, addresses: Array[Long]) extends ExecutionState
-
-  /** The execution is finished - no more batch */
-  case object EOF extends ExecutionState
-
   def getNextBatch(): Option[ColumnarBatch] = {
-    executeNative() match {
-      case EOF => None
-      case Batch(numRows, addresses) =>
+    // we execute the native plan each time we need another output batch and 
this could
+    // result in multiple input batches being processed
+    val result = nativeLib.executePlan(plan)
+
+    result(0) match {
+      case -1 =>
+        // EOF
+        None
+      case 1 =>
+        val numRows = result(1)
+        val addresses = result.slice(2, result.length)
         val cometVectors = nativeUtil.importVector(addresses)
-        Some(new ColumnarBatch(cometVectors.toArray, numRows))
+        Some(new ColumnarBatch(cometVectors.toArray, numRows.toInt))
+      case flag =>
+        throw new IllegalStateException(s"Invalid native flag: $flag")
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to