This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new 2ba42af81 fix: Fall back to Spark for `RANGE BETWEEN` window expressions (#1848) 2ba42af81 is described below commit 2ba42af81ac279ffc57320e766dbc88cf3ccfdc0 Author: Andy Grove <agr...@apache.org> AuthorDate: Thu Jun 5 12:20:41 2025 -0600 fix: Fall back to Spark for `RANGE BETWEEN` window expressions (#1848) --- .../org/apache/comet/serde/QueryPlanSerde.scala | 13 ++++++++--- .../org/apache/comet/CometExpressionSuite.scala | 25 +++++++++++++++++++++- 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 3fedaa7e3..13bea457d 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -261,7 +261,7 @@ object QueryPlanSerde extends Logging with CometExprShim { .newBuilder() .setCurrentRow(OperatorOuterClass.CurrentRow.newBuilder().build()) .build() - case e => + case e if frameType == RowFrame => val offset = e.eval() match { case i: Integer => i.toLong case l: Long => l @@ -275,6 +275,10 @@ object QueryPlanSerde extends Logging with CometExprShim { .setOffset(offset) .build()) .build() + case _ => + // TODO add support for numeric and temporal RANGE BETWEEN expressions + // see https://github.com/apache/datafusion-comet/issues/1246 + return None } val uBoundProto = uBound match { @@ -288,13 +292,12 @@ object QueryPlanSerde extends Logging with CometExprShim { .newBuilder() .setCurrentRow(OperatorOuterClass.CurrentRow.newBuilder().build()) .build() - case e => + case e if frameType == RowFrame => val offset = e.eval() match { case i: Integer => i.toLong case l: Long => l case _ => return None } - OperatorOuterClass.UpperWindowFrameBound .newBuilder() .setFollowing( @@ -303,6 +306,10 @@ object QueryPlanSerde extends Logging with CometExprShim { .setOffset(offset) .build()) .build() + case _ => + // TODO add support for numeric and temporal RANGE BETWEEN expressions + // see https://github.com/apache/datafusion-comet/issues/1246 + return None } (frameProto, lBoundProto, uBoundProto) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 6273ab9b0..d3113553e 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -21,6 +21,7 @@ package org.apache.comet import java.time.{Duration, Period} +import scala.collection.immutable.Seq import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag import scala.util.Random @@ -28,9 +29,10 @@ import scala.util.Random import org.apache.hadoop.fs.Path import org.apache.spark.sql.{CometTestBase, DataFrame, Row} import org.apache.spark.sql.catalyst.optimizer.SimplifyExtractValueOps -import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometProjectExec} +import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometProjectExec, CometWindowExec} import org.apache.spark.sql.execution.{InputAdapter, ProjectExec, WholeStageCodegenExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE @@ -2705,4 +2707,25 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + test("window query with rangeBetween") { + + // values are int + val df = Seq(1, 2, 4, 3, 2, 1).toDF("value") + val window = Window.orderBy($"value".desc) + + // ranges are long + val df2 = df.select( + $"value", + sum($"value").over(window.rangeBetween(Window.unboundedPreceding, 1L)), + sum($"value").over(window.rangeBetween(1L, Window.unboundedFollowing))) + + // Comet does not support RANGE BETWEEN + // https://github.com/apache/datafusion-comet/issues/1246 + val (_, cometPlan) = checkSparkAnswer(df2) + val cometWindowExecs = collect(cometPlan) { case w: CometWindowExec => + w + } + assert(cometWindowExecs.isEmpty) + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org