This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new c9008199 fix: limit with offset should return correct results (#359)
c9008199 is described below

commit c90081996c3b0537695f1fd68bdef9aa22e08a68
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Tue Apr 30 21:24:47 2024 -0700

    fix: limit with offset should return correct results (#359)
    
    * fix: limit with offset should return correct results
    
    * Move test
---
 .../apache/comet/CometSparkSessionExtensions.scala |  4 +-
 .../org/apache/comet/exec/CometExec3_4Suite.scala  | 52 ++++++++++++++++++++++
 2 files changed, 54 insertions(+), 2 deletions(-)

diff --git 
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala 
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index 17085444..1e78013c 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -340,7 +340,7 @@ class CometSparkSessionExtensions
               op
           }
 
-        case op: LocalLimitExec =>
+        case op: LocalLimitExec if getOffset(op) == 0 =>
           val newOp = transform1(op)
           newOp match {
             case Some(nativeOp) =>
@@ -349,7 +349,7 @@ class CometSparkSessionExtensions
               op
           }
 
-        case op: GlobalLimitExec =>
+        case op: GlobalLimitExec if getOffset(op) == 0 =>
           val newOp = transform1(op)
           newOp match {
             case Some(nativeOp) =>
diff --git 
a/spark/src/test/spark-3.4/org/apache/comet/exec/CometExec3_4Suite.scala 
b/spark/src/test/spark-3.4/org/apache/comet/exec/CometExec3_4Suite.scala
index 7644d54b..32b76d9b 100644
--- a/spark/src/test/spark-3.4/org/apache/comet/exec/CometExec3_4Suite.scala
+++ b/spark/src/test/spark-3.4/org/apache/comet/exec/CometExec3_4Suite.scala
@@ -41,6 +41,58 @@ class CometExec3_4Suite extends CometTestBase {
     }
   }
 
+  // The syntax is only supported by Spark 3.4+.
+  test("subquery limit: limit with offset should return correct results") {
+    withSQLConf(CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
+      withTable("t1", "t2") {
+        val table1 =
+          """create temporary view t1 as select * from values
+            |  ("val1a", 6S, 8, 10L, float(15.0), 20D, 20E2BD, timestamp 
'2014-04-04 01:00:00.000', date '2014-04-04'),
+            |  ("val1b", 8S, 16, 19L, float(17.0), 25D, 26E2BD, timestamp 
'2014-05-04 01:01:00.000', date '2014-05-04'),
+            |  ("val1a", 16S, 12, 21L, float(15.0), 20D, 20E2BD, timestamp 
'2014-06-04 01:02:00.001', date '2014-06-04'),
+            |  ("val1a", 16S, 12, 10L, float(15.0), 20D, 20E2BD, timestamp 
'2014-07-04 01:01:00.000', date '2014-07-04'),
+            |  ("val1c", 8S, 16, 19L, float(17.0), 25D, 26E2BD, timestamp 
'2014-05-04 01:02:00.001', date '2014-05-05'),
+            |  ("val1d", null, 16, 22L, float(17.0), 25D, 26E2BD, timestamp 
'2014-06-04 01:01:00.000', null),
+            |  ("val1d", null, 16, 19L, float(17.0), 25D, 26E2BD, timestamp 
'2014-07-04 01:02:00.001', null),
+            |  ("val1e", 10S, null, 25L, float(17.0), 25D, 26E2BD, timestamp 
'2014-08-04 01:01:00.000', date '2014-08-04'),
+            |  ("val1e", 10S, null, 19L, float(17.0), 25D, 26E2BD, timestamp 
'2014-09-04 01:02:00.001', date '2014-09-04'),
+            |  ("val1d", 10S, null, 12L, float(17.0), 25D, 26E2BD, timestamp 
'2015-05-04 01:01:00.000', date '2015-05-04'),
+            |  ("val1a", 6S, 8, 10L, float(15.0), 20D, 20E2BD, timestamp 
'2014-04-04 01:02:00.001', date '2014-04-04'),
+            |  ("val1e", 10S, null, 19L, float(17.0), 25D, 26E2BD, timestamp 
'2014-05-04 01:01:00.000', date '2014-05-04')
+            |  as t1(t1a, t1b, t1c, t1d, t1e, t1f, t1g, t1h, 
t1i);""".stripMargin
+        val table2 =
+          """create temporary view t2 as select * from values
+            |  ("val2a", 6S, 12, 14L, float(15), 20D, 20E2BD, timestamp 
'2014-04-04 01:01:00.000', date '2014-04-04'),
+            |  ("val1b", 10S, 12, 19L, float(17), 25D, 26E2BD, timestamp 
'2014-05-04 01:01:00.000', date '2014-05-04'),
+            |  ("val1b", 8S, 16, 119L, float(17), 25D, 26E2BD, timestamp 
'2015-05-04 01:01:00.000', date '2015-05-04'),
+            |  ("val1c", 12S, 16, 219L, float(17), 25D, 26E2BD, timestamp 
'2016-05-04 01:01:00.000', date '2016-05-04'),
+            |  ("val1b", null, 16, 319L, float(17), 25D, 26E2BD, timestamp 
'2017-05-04 01:01:00.000', null),
+            |  ("val2e", 8S, null, 419L, float(17), 25D, 26E2BD, timestamp 
'2014-06-04 01:01:00.000', date '2014-06-04'),
+            |  ("val1f", 19S, null, 519L, float(17), 25D, 26E2BD, timestamp 
'2014-05-04 01:01:00.000', date '2014-05-04'),
+            |  ("val1b", 10S, 12, 19L, float(17), 25D, 26E2BD, timestamp 
'2014-06-04 01:01:00.000', date '2014-06-04'),
+            |  ("val1b", 8S, 16, 19L, float(17), 25D, 26E2BD, timestamp 
'2014-07-04 01:01:00.000', date '2014-07-04'),
+            |  ("val1c", 12S, 16, 19L, float(17), 25D, 26E2BD, timestamp 
'2014-08-04 01:01:00.000', date '2014-08-05'),
+            |  ("val1e", 8S, null, 19L, float(17), 25D, 26E2BD, timestamp 
'2014-09-04 01:01:00.000', date '2014-09-04'),
+            |  ("val1f", 19S, null, 19L, float(17), 25D, 26E2BD, timestamp 
'2014-10-04 01:01:00.000', date '2014-10-04'),
+            |  ("val1b", null, 16, 19L, float(17), 25D, 26E2BD, timestamp 
'2014-05-04 01:01:00.000', null)
+            |  as t2(t2a, t2b, t2c, t2d, t2e, t2f, t2g, t2h, 
t2i);""".stripMargin
+        sql(table1)
+        sql(table2)
+
+        val df = sql("""SELECT *
+                       |FROM   t1
+                       |WHERE  t1c IN (SELECT t2c
+                       |               FROM   t2
+                       |               WHERE  t2b >= 8
+                       |               LIMIT  2
+                       |               OFFSET 2)
+                       |LIMIT 4
+                       |OFFSET 2;""".stripMargin)
+        checkSparkAnswer(df)
+      }
+    }
+  }
+
   // Dataset.offset API is not available before Spark 3.4
   test("offset") {
     withSQLConf(CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to