This is an automated email from the ASF dual-hosted git repository. lwz9103 pushed a commit to branch liquid in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
commit a2d33287ec1c4f66a7e8fc1127ecabe6e0a5889f Author: Chang Chen <[email protected]> AuthorDate: Tue May 21 11:37:19 2024 +0800 [KY-SPARK] Fix limit with offset for KySpark according to https://github.com/apache/incubator-gluten/pull/4607 (cherry picked from commit b8e64d09f762e28f6aaff6bbf483d63b4e63a099) --- .../gluten/sql/shims/spark33/Spark33Shims.scala | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala index 6e3c883320..e7c6e0cd08 100644 --- a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala +++ b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec import org.apache.spark.sql.catalyst.util.TimestampFormatter import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil, SparkPlan} +import org.apache.spark.sql.execution.{FileSourceScanExec, GlobalLimitExec, PartitionedFileUtil, SparkPlan, TakeOrderedAndProjectExec} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.FileFormatWriter.Empty2Null import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters @@ -248,6 +248,24 @@ class Spark33Shims extends SparkShims { metadataColumn } + private def getLimit(limit: Int, offset: Int): Int = { + if (limit == -1) { + // Only offset specified, so fetch the maximum number rows + Int.MaxValue + } else { + assert(limit > offset) + limit - offset + } + } + + override def getLimitAndOffsetFromGlobalLimit(plan: GlobalLimitExec): (Int, Int) = { + (getLimit(plan.limit, plan.offset), plan.offset) + } + + override def getLimitAndOffsetFromTopK(plan: TakeOrderedAndProjectExec): (Int, Int) = { + (getLimit(plan.limit, plan.offset), plan.offset) + } + override def getExtendedColumnarPostRules(): List[SparkSession => Rule[SparkPlan]] = { List(session => GlutenFormatFactory.getExtendedColumnarPostRule(session)) } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
