This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 2ba42af81 fix: Fall back to Spark for `RANGE BETWEEN` window 
expressions (#1848)
2ba42af81 is described below

commit 2ba42af81ac279ffc57320e766dbc88cf3ccfdc0
Author: Andy Grove <agr...@apache.org>
AuthorDate: Thu Jun 5 12:20:41 2025 -0600

    fix: Fall back to Spark for `RANGE BETWEEN` window expressions (#1848)
---
 .../org/apache/comet/serde/QueryPlanSerde.scala    | 13 ++++++++---
 .../org/apache/comet/CometExpressionSuite.scala    | 25 +++++++++++++++++++++-
 2 files changed, 34 insertions(+), 4 deletions(-)

diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 3fedaa7e3..13bea457d 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -261,7 +261,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
               .newBuilder()
               
.setCurrentRow(OperatorOuterClass.CurrentRow.newBuilder().build())
               .build()
-          case e =>
+          case e if frameType == RowFrame =>
             val offset = e.eval() match {
               case i: Integer => i.toLong
               case l: Long => l
@@ -275,6 +275,10 @@ object QueryPlanSerde extends Logging with CometExprShim {
                   .setOffset(offset)
                   .build())
               .build()
+          case _ =>
+            // TODO add support for numeric and temporal RANGE BETWEEN 
expressions
+            // see https://github.com/apache/datafusion-comet/issues/1246
+            return None
         }
 
         val uBoundProto = uBound match {
@@ -288,13 +292,12 @@ object QueryPlanSerde extends Logging with CometExprShim {
               .newBuilder()
               
.setCurrentRow(OperatorOuterClass.CurrentRow.newBuilder().build())
               .build()
-          case e =>
+          case e if frameType == RowFrame =>
             val offset = e.eval() match {
               case i: Integer => i.toLong
               case l: Long => l
               case _ => return None
             }
-
             OperatorOuterClass.UpperWindowFrameBound
               .newBuilder()
               .setFollowing(
@@ -303,6 +306,10 @@ object QueryPlanSerde extends Logging with CometExprShim {
                   .setOffset(offset)
                   .build())
               .build()
+          case _ =>
+            // TODO add support for numeric and temporal RANGE BETWEEN 
expressions
+            // see https://github.com/apache/datafusion-comet/issues/1246
+            return None
         }
 
         (frameProto, lBoundProto, uBoundProto)
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index 6273ab9b0..d3113553e 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -21,6 +21,7 @@ package org.apache.comet
 
 import java.time.{Duration, Period}
 
+import scala.collection.immutable.Seq
 import scala.reflect.ClassTag
 import scala.reflect.runtime.universe.TypeTag
 import scala.util.Random
@@ -28,9 +29,10 @@ import scala.util.Random
 import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
 import org.apache.spark.sql.catalyst.optimizer.SimplifyExtractValueOps
-import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometProjectExec}
+import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometProjectExec, 
CometWindowExec}
 import org.apache.spark.sql.execution.{InputAdapter, ProjectExec, 
WholeStageCodegenExec}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE
@@ -2705,4 +2707,25 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
       }
   }
 
+  test("window query with rangeBetween") {
+
+    // values are int
+    val df = Seq(1, 2, 4, 3, 2, 1).toDF("value")
+    val window = Window.orderBy($"value".desc)
+
+    // ranges are long
+    val df2 = df.select(
+      $"value",
+      sum($"value").over(window.rangeBetween(Window.unboundedPreceding, 1L)),
+      sum($"value").over(window.rangeBetween(1L, Window.unboundedFollowing)))
+
+    // Comet does not support RANGE BETWEEN
+    // https://github.com/apache/datafusion-comet/issues/1246
+    val (_, cometPlan) = checkSparkAnswer(df2)
+    val cometWindowExecs = collect(cometPlan) { case w: CometWindowExec =>
+      w
+    }
+    assert(cometWindowExecs.isEmpty)
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to