This is an automated email from the ASF dual-hosted git repository.

lwz9103 pushed a commit to branch liquid
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git

commit a2d33287ec1c4f66a7e8fc1127ecabe6e0a5889f
Author: Chang Chen <[email protected]>
AuthorDate: Tue May 21 11:37:19 2024 +0800

    [KY-SPARK] Fix limit with offset for KySpark according to 
https://github.com/apache/incubator-gluten/pull/4607
    
    (cherry picked from commit b8e64d09f762e28f6aaff6bbf483d63b4e63a099)
---
 .../gluten/sql/shims/spark33/Spark33Shims.scala      | 20 +++++++++++++++++++-
 1 file changed, 19 insertions(+), 1 deletion(-)

diff --git 
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
 
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
index 6e3c883320..e7c6e0cd08 100644
--- 
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
+++ 
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
@@ -40,7 +40,7 @@ import 
org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
 import org.apache.spark.sql.catalyst.util.TimestampFormatter
 import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.connector.expressions.Transform
-import org.apache.spark.sql.execution.{FileSourceScanExec, 
PartitionedFileUtil, SparkPlan}
+import org.apache.spark.sql.execution.{FileSourceScanExec, GlobalLimitExec, 
PartitionedFileUtil, SparkPlan, TakeOrderedAndProjectExec}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.FileFormatWriter.Empty2Null
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters
@@ -248,6 +248,24 @@ class Spark33Shims extends SparkShims {
     metadataColumn
   }
 
+  private def getLimit(limit: Int, offset: Int): Int = {
+    if (limit == -1) {
+      // Only offset specified, so fetch the maximum number rows
+      Int.MaxValue
+    } else {
+      assert(limit > offset)
+      limit - offset
+    }
+  }
+
+  override def getLimitAndOffsetFromGlobalLimit(plan: GlobalLimitExec): (Int, 
Int) = {
+    (getLimit(plan.limit, plan.offset), plan.offset)
+  }
+
+  override def getLimitAndOffsetFromTopK(plan: TakeOrderedAndProjectExec): 
(Int, Int) = {
+    (getLimit(plan.limit, plan.offset), plan.offset)
+  }
+
   override def getExtendedColumnarPostRules(): List[SparkSession => 
Rule[SparkPlan]] = {
     List(session => GlutenFormatFactory.getExtendedColumnarPostRule(session))
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to