This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new c698d82  fix: Comet should not fail on negative limit parameter (#288)
c698d82 is described below

commit c698d826abcd5aed28fc7a3adaef250807bd3866
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Thu Apr 18 21:51:25 2024 -0700

    fix: Comet should not fail on negative limit parameter (#288)
    
    * fix: Comet should not fail on negative limit parameter
    
    * Move test
---
 pom.xml                                            | 12 +++--
 spark/pom.xml                                      |  3 +-
 .../org/apache/comet/serde/QueryPlanSerde.scala    |  4 +-
 .../org/apache/comet/exec/CometExec3_4Suite.scala  | 52 ++++++++++++++++++++++
 4 files changed, 64 insertions(+), 7 deletions(-)

diff --git a/pom.xml b/pom.xml
index c7e417d..f5f9bf2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -86,7 +86,8 @@ under the License.
       -Djdk.reflect.useDirectMethodHandle=false
     </extraJavaTestArgs>
     <argLine>-ea -Xmx4g -Xss4m ${extraJavaTestArgs}</argLine>
-    <additional.test.source>spark-3.3-plus</additional.test.source>
+    <additional.3_3.test.source>spark-3.3-plus</additional.3_3.test.source>
+    <additional.3_4.test.source>spark-3.4</additional.3_4.test.source>
   </properties>
 
   <dependencyManagement>
@@ -496,7 +497,8 @@ under the License.
         <spark.version.short>3.2</spark.version.short>
         <parquet.version>1.12.0</parquet.version>
         <!-- we don't add special test suits for spark-3.2, so a not existed 
dir is specified-->
-        <additional.test.source>not-needed-yet</additional.test.source>
+        <additional.3_3.test.source>not-needed-yet</additional.3_3.test.source>
+        <additional.3_4.test.source>not-needed-yet</additional.3_4.test.source>
       </properties>
     </profile>
 
@@ -507,7 +509,8 @@ under the License.
         <spark.version>3.3.2</spark.version>
         <spark.version.short>3.3</spark.version.short>
         <parquet.version>1.12.0</parquet.version>
-        <additional.test.source>spark-3.3-plus</additional.test.source>
+        <additional.3_3.test.source>spark-3.3-plus</additional.3_3.test.source>
+        <additional.3_4.test.source>not-needed-yet</additional.3_4.test.source>
       </properties>
     </profile>
 
@@ -517,7 +520,8 @@ under the License.
         <scala.version>2.12.17</scala.version>
         <spark.version.short>3.4</spark.version.short>
         <parquet.version>1.13.1</parquet.version>
-        <additional.test.source>spark-3.3-plus</additional.test.source>
+        <additional.3_3.test.source>spark-3.3-plus</additional.3_3.test.source>
+        <additional.3_4.test.source>spark-3.4</additional.3_4.test.source>
       </properties>
     </profile>
 
diff --git a/spark/pom.xml b/spark/pom.xml
index 31d80bb..e0be7c6 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -245,7 +245,8 @@ under the License.
             </goals>
             <configuration>
               <sources>
-                <source>src/test/${additional.test.source}</source>
+                <source>src/test/${additional.3_3.test.source}</source>
+                <source>src/test/${additional.3_4.test.source}</source>
               </sources>
             </configuration>
           </execution>
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index a0c17fc..555ab40 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -1837,14 +1837,14 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde {
         }
 
       case globalLimitExec: GlobalLimitExec if isCometOperatorEnabled(op.conf, 
"global_limit") =>
-        if (childOp.nonEmpty) {
+        // TODO: We don't support negative limit for now.
+        if (childOp.nonEmpty && globalLimitExec.limit >= 0) {
           val limitBuilder = OperatorOuterClass.Limit.newBuilder()
 
           // Spark 3.2 doesn't support offset for GlobalLimit, but newer Spark 
versions
           // support it. Before we upgrade to Spark 3.3, just set it zero.
           // TODO: Spark 3.3 might have negative limit (-1) for Offset usage.
           // When we upgrade to Spark 3.3., we need to address it here.
-          assert(globalLimitExec.limit >= 0, "limit should be greater or equal 
to zero")
           limitBuilder.setLimit(globalLimitExec.limit)
           limitBuilder.setOffset(0)
 
diff --git 
a/spark/src/test/spark-3.4/org/apache/comet/exec/CometExec3_4Suite.scala 
b/spark/src/test/spark-3.4/org/apache/comet/exec/CometExec3_4Suite.scala
new file mode 100644
index 0000000..7644d54
--- /dev/null
+++ b/spark/src/test/spark-3.4/org/apache/comet/exec/CometExec3_4Suite.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.exec
+
+import org.scalactic.source.Position
+import org.scalatest.Tag
+
+import org.apache.spark.sql.CometTestBase
+
+import org.apache.comet.CometConf
+
+/**
+ * This test suite contains tests for only Spark 3.4.
+ */
+class CometExec3_4Suite extends CometTestBase {
+  import testImplicits._
+
+  override protected def test(testName: String, testTags: Tag*)(testFun: => 
Any)(implicit
+      pos: Position): Unit = {
+    super.test(testName, testTags: _*) {
+      withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
+        testFun
+      }
+    }
+  }
+
+  // Dataset.offset API is not available before Spark 3.4
+  test("offset") {
+    withSQLConf(CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
+      checkSparkAnswer(testData.offset(90))
+      checkSparkAnswer(arrayData.toDF().offset(99))
+      checkSparkAnswer(mapData.toDF().offset(99))
+    }
+  }
+}

Reply via email to