This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 3f4d7ca734 [KYUUBI #6983] Remove support for
spark.sql.watchdog.forcedMaxOutputRows
3f4d7ca734 is described below
commit 3f4d7ca73470ee67d566f2953e2ca9831879ea5d
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Mar 17 16:02:27 2025 +0800
[KYUUBI #6983] Remove support for spark.sql.watchdog.forcedMaxOutputRows
### Why are the changes needed?
The feature `spark.sql.watchdog.forcedMaxOutputRows` is a little bit hacky,
it's actually a manually implemented "limit pushdown", we already have a simple
and more reliable way to achieve that by using
`kyuubi.operation.result.max.rows`.
### How was this patch tested?
Pass GHA.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #6983 from pan3793/rm-forcedMaxOutputRows.
Closes #6983
5e0707955 [Cheng Pan] Remove support for
spark.sql.watchdog.forcedMaxOutputRows
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
docs/deployment/migration-guide.md | 4 +
.../org/apache/kyuubi/sql/KyuubiSQLConf.scala | 8 -
.../kyuubi/sql/KyuubiSparkSQLExtension.scala | 3 +-
.../sql/watchdog/ForcedMaxOutputRowsBase.scala | 90 -----
.../sql/watchdog/ForcedMaxOutputRowsRule.scala | 46 ---
.../org/apache/spark/sql/WatchDogSuiteBase.scala | 389 ---------------------
.../org/apache/kyuubi/sql/KyuubiSQLConf.scala | 8 -
.../kyuubi/sql/KyuubiSparkSQLExtension.scala | 3 +-
.../sql/watchdog/ForcedMaxOutputRowsBase.scala | 90 -----
.../sql/watchdog/ForcedMaxOutputRowsRule.scala | 46 ---
.../org/apache/spark/sql/WatchDogSuiteBase.scala | 389 ---------------------
.../org/apache/kyuubi/sql/KyuubiSQLConf.scala | 8 -
.../kyuubi/sql/KyuubiSparkSQLExtension.scala | 3 +-
.../sql/watchdog/ForcedMaxOutputRowsBase.scala | 90 -----
.../sql/watchdog/ForcedMaxOutputRowsRule.scala | 46 ---
.../scala/org/apache/spark/sql/WatchDogSuite.scala | 389 ---------------------
16 files changed, 7 insertions(+), 1605 deletions(-)
diff --git a/docs/deployment/migration-guide.md
b/docs/deployment/migration-guide.md
index 6899359c8a..3570da71ff 100644
--- a/docs/deployment/migration-guide.md
+++ b/docs/deployment/migration-guide.md
@@ -17,6 +17,10 @@
# Kyuubi Migration Guide
+## Upgrading from Kyuubi 1.10 to 1.11
+
+* Since Kyuubi 1.11, the configuration
`spark.sql.watchdog.forcedMaxOutputRows` provided by Kyuubi Spark extension is
removed, consider using `kyuubi.operation.result.max.rows` instead. Note, the
latter works without requirement of installing Kyuubi Spark extension.
+
## Upgrading from Kyuubi 1.9 to 1.10
* Since Kyuubi 1.10, `beeline` is deprecated and will be removed in the
future, please use `kyuubi-beeline` instead.
diff --git
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
index e25d8a6777..e72a6c0735 100644
---
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
@@ -123,14 +123,6 @@ object KyuubiSQLConf {
.bytesConf(ByteUnit.BYTE)
.createOptional
- val WATCHDOG_FORCED_MAXOUTPUTROWS =
- buildConf("spark.sql.watchdog.forcedMaxOutputRows")
- .doc("Add ForcedMaxOutputRows rule to avoid huge output rows of
non-limit query " +
- "unexpectedly, it's optional that works with defined")
- .version("1.4.0")
- .intConf
- .createOptional
-
val DROP_IGNORE_NONEXISTENT =
buildConf("spark.sql.optimizer.dropIgnoreNonExistent")
.doc("Do not report an error if DROP
DATABASE/TABLE/VIEW/FUNCTION/PARTITION specifies " +
diff --git
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
index fd11fb5f57..33ff3e3177 100644
---
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.sql
import org.apache.spark.sql.{FinalStageResourceManager,
InjectCustomResourceProfile, SparkSessionExtensions}
-import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule,
KyuubiUnsupportedOperationsCheck, MaxScanStrategy}
+import org.apache.kyuubi.sql.watchdog.{KyuubiUnsupportedOperationsCheck,
MaxScanStrategy}
// scalastyle:off line.size.limit
/**
@@ -38,7 +38,6 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions
=> Unit) {
// watchdog extension
extensions.injectCheckRule(_ => KyuubiUnsupportedOperationsCheck)
- extensions.injectOptimizerRule(ForcedMaxOutputRowsRule)
extensions.injectPlannerStrategy(MaxScanStrategy)
extensions.injectQueryStagePrepRule(FinalStageResourceManager(_))
diff --git
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsBase.scala
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsBase.scala
deleted file mode 100644
index dd329bd659..0000000000
---
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsBase.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.watchdog
-
-import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.expressions.Alias
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.command.DataWritingCommand
-
-import org.apache.kyuubi.sql.KyuubiSQLConf
-
-/*
- * Add ForcedMaxOutputRows rule for output rows limitation
- * to avoid huge output rows of non_limit query unexpectedly
- * mainly applied to cases as below:
- *
- * case 1:
- * {{{
- * SELECT [c1, c2, ...]
- * }}}
- *
- * case 2:
- * {{{
- * WITH CTE AS (
- * ...)
- * SELECT [c1, c2, ...] FROM CTE ...
- * }}}
- *
- * The Logical Rule add a GlobalLimit node before root project
- * */
-trait ForcedMaxOutputRowsBase extends Rule[LogicalPlan] {
-
- protected def isChildAggregate(a: Aggregate): Boolean
-
- protected def canInsertLimitInner(p: LogicalPlan): Boolean = p match {
- case Aggregate(_, Alias(_, "havingCondition") :: Nil, _) => false
- case agg: Aggregate => !isChildAggregate(agg)
- case _: RepartitionByExpression => true
- case _: Distinct => true
- case _: Filter => true
- case _: Project => true
- case Limit(_, _) => true
- case _: Sort => true
- case Union(children, _, _) =>
- if (children.exists(_.isInstanceOf[DataWritingCommand])) {
- false
- } else {
- true
- }
- case _: MultiInstanceRelation => true
- case _: Join => true
- case _ => false
- }
-
- protected def canInsertLimit(p: LogicalPlan, maxOutputRowsOpt: Option[Int]):
Boolean = {
- maxOutputRowsOpt match {
- case Some(forcedMaxOutputRows) if forcedMaxOutputRows >= 0 =>
- canInsertLimitInner(p) && !p.maxRows.exists(_ <= forcedMaxOutputRows)
- case _ => false
- }
- }
-
- override def apply(plan: LogicalPlan): LogicalPlan = {
- val maxOutputRowsOpt =
conf.getConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS)
- plan match {
- case p if p.resolved && canInsertLimit(p, maxOutputRowsOpt) =>
- Limit(
- maxOutputRowsOpt.get,
- plan)
- case _ => plan
- }
- }
-}
diff --git
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsRule.scala
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsRule.scala
deleted file mode 100644
index a3d990b109..0000000000
---
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsRule.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.watchdog
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CommandResult,
LogicalPlan, Union, WithCTE}
-import org.apache.spark.sql.execution.command.DataWritingCommand
-
-case class ForcedMaxOutputRowsRule(sparkSession: SparkSession) extends
ForcedMaxOutputRowsBase {
-
- override protected def isChildAggregate(a: Aggregate): Boolean = false
-
- override protected def canInsertLimitInner(p: LogicalPlan): Boolean = p
match {
- case WithCTE(plan, _) => this.canInsertLimitInner(plan)
- case plan: LogicalPlan => plan match {
- case Union(children, _, _) => !children.exists {
- case _: DataWritingCommand => true
- case p: CommandResult if
p.commandLogicalPlan.isInstanceOf[DataWritingCommand] => true
- case _ => false
- }
- case _ => super.canInsertLimitInner(plan)
- }
- }
-
- override protected def canInsertLimit(p: LogicalPlan, maxOutputRowsOpt:
Option[Int]): Boolean = {
- p match {
- case WithCTE(plan, _) => this.canInsertLimit(plan, maxOutputRowsOpt)
- case _ => super.canInsertLimit(p, maxOutputRowsOpt)
- }
- }
-}
diff --git
a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala
b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala
index 22c998d3b0..b5f8c67261 100644
---
a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala
@@ -22,7 +22,6 @@ import java.io.File
import scala.collection.JavaConverters._
import org.apache.commons.io.FileUtils
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException}
@@ -96,394 +95,6 @@ trait WatchDogSuiteBase extends KyuubiSparkSQLExtensionTest
{
}
}
- test("test watchdog: simple SELECT STATEMENT") {
-
- withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-
- List("", "ORDER BY c1", "ORDER BY c2").foreach { sort =>
- List("", " DISTINCT").foreach { distinct =>
- assert(sql(
- s"""
- |SELECT $distinct *
- |FROM t1
- |$sort
-
|""".stripMargin).queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
- }
- }
-
- limitAndExpecteds.foreach { case LimitAndExpected(limit, expected) =>
- List("", "ORDER BY c1", "ORDER BY c2").foreach { sort =>
- List("", "DISTINCT").foreach { distinct =>
- assert(sql(
- s"""
- |SELECT $distinct *
- |FROM t1
- |$sort
- |LIMIT $limit
-
|""".stripMargin).queryExecution.optimizedPlan.maxRows.contains(expected))
- }
- }
- }
- }
- }
-
- test("test watchdog: SELECT ... WITH AGGREGATE STATEMENT ") {
-
- withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-
- assert(!sql("SELECT count(*) FROM t1")
- .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-
- val sorts = List("", "ORDER BY cnt", "ORDER BY c1", "ORDER BY cnt, c1",
"ORDER BY c1, cnt")
- val havingConditions = List("", "HAVING cnt > 1")
-
- havingConditions.foreach { having =>
- sorts.foreach { sort =>
- assert(sql(
- s"""
- |SELECT c1, COUNT(*) as cnt
- |FROM t1
- |GROUP BY c1
- |$having
- |$sort
-
|""".stripMargin).queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
- }
- }
-
- limitAndExpecteds.foreach { case LimitAndExpected(limit, expected) =>
- havingConditions.foreach { having =>
- sorts.foreach { sort =>
- assert(sql(
- s"""
- |SELECT c1, COUNT(*) as cnt
- |FROM t1
- |GROUP BY c1
- |$having
- |$sort
- |LIMIT $limit
-
|""".stripMargin).queryExecution.optimizedPlan.maxRows.contains(expected))
- }
- }
- }
- }
- }
-
- test("test watchdog: SELECT with CTE forceMaxOutputRows") {
- // simple CTE
- val q1 =
- """
- |WITH t2 AS (
- | SELECT * FROM t1
- |)
- |""".stripMargin
-
- // nested CTE
- val q2 =
- """
- |WITH
- | t AS (SELECT * FROM t1),
- | t2 AS (
- | WITH t3 AS (SELECT * FROM t1)
- | SELECT * FROM t3
- | )
- |""".stripMargin
- withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-
- val sorts = List("", "ORDER BY c1", "ORDER BY c2")
-
- sorts.foreach { sort =>
- Seq(q1, q2).foreach { withQuery =>
- assert(sql(
- s"""
- |$withQuery
- |SELECT * FROM t2
- |$sort
-
|""".stripMargin).queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
- }
- }
-
- limitAndExpecteds.foreach { case LimitAndExpected(limit, expected) =>
- sorts.foreach { sort =>
- Seq(q1, q2).foreach { withQuery =>
- assert(sql(
- s"""
- |$withQuery
- |SELECT * FROM t2
- |$sort
- |LIMIT $limit
-
|""".stripMargin).queryExecution.optimizedPlan.maxRows.contains(expected))
- }
- }
- }
- }
- }
-
- test("test watchdog: SELECT AGGREGATE WITH CTE forceMaxOutputRows") {
-
- withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-
- assert(!sql(
- """
- |WITH custom_cte AS (
- |SELECT * FROM t1
- |)
- |
- |SELECT COUNT(*)
- |FROM custom_cte
- |""".stripMargin).queryExecution
- .analyzed.isInstanceOf[GlobalLimit])
-
- val sorts = List("", "ORDER BY cnt", "ORDER BY c1", "ORDER BY cnt, c1",
"ORDER BY c1, cnt")
- val havingConditions = List("", "HAVING cnt > 1")
-
- havingConditions.foreach { having =>
- sorts.foreach { sort =>
- assert(sql(
- s"""
- |WITH custom_cte AS (
- |SELECT * FROM t1
- |)
- |
- |SELECT c1, COUNT(*) as cnt
- |FROM custom_cte
- |GROUP BY c1
- |$having
- |$sort
-
|""".stripMargin).queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
- }
- }
-
- limitAndExpecteds.foreach { case LimitAndExpected(limit, expected) =>
- havingConditions.foreach { having =>
- sorts.foreach { sort =>
- assert(sql(
- s"""
- |WITH custom_cte AS (
- |SELECT * FROM t1
- |)
- |
- |SELECT c1, COUNT(*) as cnt
- |FROM custom_cte
- |GROUP BY c1
- |$having
- |$sort
- |LIMIT $limit
-
|""".stripMargin).queryExecution.optimizedPlan.maxRows.contains(expected))
- }
- }
- }
- }
- }
-
- test("test watchdog: UNION Statement for forceMaxOutputRows") {
-
- withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-
- List("", "ALL").foreach { x =>
- assert(sql(
- s"""
- |SELECT c1, c2 FROM t1
- |UNION $x
- |SELECT c1, c2 FROM t2
- |UNION $x
- |SELECT c1, c2 FROM t3
- |""".stripMargin)
- .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
- }
-
- val sorts = List("", "ORDER BY cnt", "ORDER BY c1", "ORDER BY cnt, c1",
"ORDER BY c1, cnt")
- val havingConditions = List("", "HAVING cnt > 1")
-
- List("", "ALL").foreach { x =>
- havingConditions.foreach { having =>
- sorts.foreach { sort =>
- assert(sql(
- s"""
- |SELECT c1, count(c2) as cnt
- |FROM t1
- |GROUP BY c1
- |$having
- |UNION $x
- |SELECT c1, COUNT(c2) as cnt
- |FROM t2
- |GROUP BY c1
- |$having
- |UNION $x
- |SELECT c1, COUNT(c2) as cnt
- |FROM t3
- |GROUP BY c1
- |$having
- |$sort
- |""".stripMargin)
- .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
- }
- }
- }
-
- limitAndExpecteds.foreach { case LimitAndExpected(limit, expected) =>
- assert(sql(
- s"""
- |SELECT c1, c2 FROM t1
- |UNION
- |SELECT c1, c2 FROM t2
- |UNION
- |SELECT c1, c2 FROM t3
- |LIMIT $limit
- |""".stripMargin)
- .queryExecution.optimizedPlan.maxRows.contains(expected))
- }
- }
- }
-
- test("test watchdog: Select View Statement for forceMaxOutputRows") {
- withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "3") {
- withTable("tmp_table", "tmp_union") {
- withView("tmp_view", "tmp_view2") {
- sql(s"create table tmp_table (a int, b int)")
- sql(s"insert into tmp_table values
(1,10),(2,20),(3,30),(4,40),(5,50)")
- sql(s"create table tmp_union (a int, b int)")
- sql(s"insert into tmp_union values
(6,60),(7,70),(8,80),(9,90),(10,100)")
- sql(s"create view tmp_view2 as select * from tmp_union")
- assert(!sql(
- s"""
- |CREATE VIEW tmp_view
- |as
- |SELECT * FROM
- |tmp_table
- |""".stripMargin)
- .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-
- assert(sql(
- s"""
- |SELECT * FROM
- |tmp_view
- |""".stripMargin)
- .queryExecution.optimizedPlan.maxRows.contains(3))
-
- assert(sql(
- s"""
- |SELECT * FROM
- |tmp_view
- |limit 11
- |""".stripMargin)
- .queryExecution.optimizedPlan.maxRows.contains(3))
-
- assert(sql(
- s"""
- |SELECT * FROM
- |(select * from tmp_view
- |UNION
- |select * from tmp_view2)
- |ORDER BY a
- |DESC
- |""".stripMargin)
- .collect().head.get(0) === 10)
- }
- }
- }
- }
-
- test("test watchdog: Insert Statement for forceMaxOutputRows") {
-
- withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
- withTable("tmp_table", "tmp_insert") {
- spark.sql(s"create table tmp_table (a int, b int)")
- spark.sql(s"insert into tmp_table values
(1,10),(2,20),(3,30),(4,40),(5,50)")
- val multiInsertTableName1: String = "tmp_tbl1"
- val multiInsertTableName2: String = "tmp_tbl2"
- sql(s"drop table if exists $multiInsertTableName1")
- sql(s"drop table if exists $multiInsertTableName2")
- sql(s"create table $multiInsertTableName1 like tmp_table")
- sql(s"create table $multiInsertTableName2 like tmp_table")
- assert(!sql(
- s"""
- |FROM tmp_table
- |insert into $multiInsertTableName1 select * limit 2
- |insert into $multiInsertTableName2 select *
- |""".stripMargin)
- .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
- }
- }
- }
-
- test("test watchdog: Distribute by for forceMaxOutputRows") {
-
- withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
- withTable("tmp_table") {
- spark.sql(s"create table tmp_table (a int, b int)")
- spark.sql(s"insert into tmp_table values
(1,10),(2,20),(3,30),(4,40),(5,50)")
- assert(sql(
- s"""
- |SELECT *
- |FROM tmp_table
- |DISTRIBUTE BY a
- |""".stripMargin)
- .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
- }
- }
- }
-
- test("test watchdog: Subquery for forceMaxOutputRows") {
- withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "1") {
- withTable("tmp_table1") {
- sql("CREATE TABLE spark_catalog.`default`.tmp_table1(KEY INT, VALUE
STRING) USING PARQUET")
- sql("INSERT INTO TABLE spark_catalog.`default`.tmp_table1 " +
- "VALUES (1, 'aa'),(2,'bb'),(3, 'cc'),(4,'aa'),(5,'cc'),(6, 'aa')")
- assert(
- sql("select * from
tmp_table1").queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
- val testSqlText =
- """
- |select count(*)
- |from tmp_table1
- |where tmp_table1.key in (
- |select distinct tmp_table1.key
- |from tmp_table1
- |where tmp_table1.value = "aa"
- |)
- |""".stripMargin
- val plan = sql(testSqlText).queryExecution.optimizedPlan
- assert(!findGlobalLimit(plan))
- checkAnswer(sql(testSqlText), Row(3) :: Nil)
- }
-
- def findGlobalLimit(plan: LogicalPlan): Boolean = plan match {
- case _: GlobalLimit => true
- case p if p.children.isEmpty => false
- case p => p.children.exists(findGlobalLimit)
- }
-
- }
- }
-
- test("test watchdog: Join for forceMaxOutputRows") {
- withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "1") {
- withTable("tmp_table1", "tmp_table2") {
- sql("CREATE TABLE spark_catalog.`default`.tmp_table1(KEY INT, VALUE
STRING) USING PARQUET")
- sql("INSERT INTO TABLE spark_catalog.`default`.tmp_table1 " +
- "VALUES (1, 'aa'),(2,'bb'),(3, 'cc'),(4,'aa'),(5,'cc'),(6, 'aa')")
- sql("CREATE TABLE spark_catalog.`default`.tmp_table2(KEY INT, VALUE
STRING) USING PARQUET")
- sql("INSERT INTO TABLE spark_catalog.`default`.tmp_table2 " +
- "VALUES (1, 'aa'),(2,'bb'),(3, 'cc'),(4,'aa'),(5,'cc'),(6, 'aa')")
- val testSqlText =
- """
- |select a.*,b.*
- |from tmp_table1 a
- |join
- |tmp_table2 b
- |on a.KEY = b.KEY
- |""".stripMargin
- val plan = sql(testSqlText).queryExecution.optimizedPlan
- assert(findGlobalLimit(plan))
- }
-
- def findGlobalLimit(plan: LogicalPlan): Boolean = plan match {
- case _: GlobalLimit => true
- case p if p.children.isEmpty => false
- case p => p.children.exists(findGlobalLimit)
- }
- }
- }
-
private def checkMaxFileSize(tableSize: Long, nonPartTableSize: Long): Unit
= {
withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key ->
tableSize.toString) {
checkAnswer(sql("SELECT count(distinct(p)) FROM test"), Row(10) :: Nil)
diff --git
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
index e25d8a6777..e72a6c0735 100644
---
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
@@ -123,14 +123,6 @@ object KyuubiSQLConf {
.bytesConf(ByteUnit.BYTE)
.createOptional
- val WATCHDOG_FORCED_MAXOUTPUTROWS =
- buildConf("spark.sql.watchdog.forcedMaxOutputRows")
- .doc("Add ForcedMaxOutputRows rule to avoid huge output rows of
non-limit query " +
- "unexpectedly, it's optional that works with defined")
- .version("1.4.0")
- .intConf
- .createOptional
-
val DROP_IGNORE_NONEXISTENT =
buildConf("spark.sql.optimizer.dropIgnoreNonExistent")
.doc("Do not report an error if DROP
DATABASE/TABLE/VIEW/FUNCTION/PARTITION specifies " +
diff --git
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
index fd11fb5f57..33ff3e3177 100644
---
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.sql
import org.apache.spark.sql.{FinalStageResourceManager,
InjectCustomResourceProfile, SparkSessionExtensions}
-import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule,
KyuubiUnsupportedOperationsCheck, MaxScanStrategy}
+import org.apache.kyuubi.sql.watchdog.{KyuubiUnsupportedOperationsCheck,
MaxScanStrategy}
// scalastyle:off line.size.limit
/**
@@ -38,7 +38,6 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions
=> Unit) {
// watchdog extension
extensions.injectCheckRule(_ => KyuubiUnsupportedOperationsCheck)
- extensions.injectOptimizerRule(ForcedMaxOutputRowsRule)
extensions.injectPlannerStrategy(MaxScanStrategy)
extensions.injectQueryStagePrepRule(FinalStageResourceManager(_))
diff --git
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsBase.scala
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsBase.scala
deleted file mode 100644
index dd329bd659..0000000000
---
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsBase.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.watchdog
-
-import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.expressions.Alias
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.command.DataWritingCommand
-
-import org.apache.kyuubi.sql.KyuubiSQLConf
-
-/*
- * Add ForcedMaxOutputRows rule for output rows limitation
- * to avoid huge output rows of non_limit query unexpectedly
- * mainly applied to cases as below:
- *
- * case 1:
- * {{{
- * SELECT [c1, c2, ...]
- * }}}
- *
- * case 2:
- * {{{
- * WITH CTE AS (
- * ...)
- * SELECT [c1, c2, ...] FROM CTE ...
- * }}}
- *
- * The Logical Rule add a GlobalLimit node before root project
- * */
-trait ForcedMaxOutputRowsBase extends Rule[LogicalPlan] {
-
- protected def isChildAggregate(a: Aggregate): Boolean
-
- protected def canInsertLimitInner(p: LogicalPlan): Boolean = p match {
- case Aggregate(_, Alias(_, "havingCondition") :: Nil, _) => false
- case agg: Aggregate => !isChildAggregate(agg)
- case _: RepartitionByExpression => true
- case _: Distinct => true
- case _: Filter => true
- case _: Project => true
- case Limit(_, _) => true
- case _: Sort => true
- case Union(children, _, _) =>
- if (children.exists(_.isInstanceOf[DataWritingCommand])) {
- false
- } else {
- true
- }
- case _: MultiInstanceRelation => true
- case _: Join => true
- case _ => false
- }
-
- protected def canInsertLimit(p: LogicalPlan, maxOutputRowsOpt: Option[Int]):
Boolean = {
- maxOutputRowsOpt match {
- case Some(forcedMaxOutputRows) if forcedMaxOutputRows >= 0 =>
- canInsertLimitInner(p) && !p.maxRows.exists(_ <= forcedMaxOutputRows)
- case _ => false
- }
- }
-
- override def apply(plan: LogicalPlan): LogicalPlan = {
- val maxOutputRowsOpt =
conf.getConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS)
- plan match {
- case p if p.resolved && canInsertLimit(p, maxOutputRowsOpt) =>
- Limit(
- maxOutputRowsOpt.get,
- plan)
- case _ => plan
- }
- }
-}
diff --git
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsRule.scala
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsRule.scala
deleted file mode 100644
index a3d990b109..0000000000
---
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsRule.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.watchdog
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CommandResult,
LogicalPlan, Union, WithCTE}
-import org.apache.spark.sql.execution.command.DataWritingCommand
-
-case class ForcedMaxOutputRowsRule(sparkSession: SparkSession) extends
ForcedMaxOutputRowsBase {
-
- override protected def isChildAggregate(a: Aggregate): Boolean = false
-
- override protected def canInsertLimitInner(p: LogicalPlan): Boolean = p
match {
- case WithCTE(plan, _) => this.canInsertLimitInner(plan)
- case plan: LogicalPlan => plan match {
- case Union(children, _, _) => !children.exists {
- case _: DataWritingCommand => true
- case p: CommandResult if
p.commandLogicalPlan.isInstanceOf[DataWritingCommand] => true
- case _ => false
- }
- case _ => super.canInsertLimitInner(plan)
- }
- }
-
- override protected def canInsertLimit(p: LogicalPlan, maxOutputRowsOpt:
Option[Int]): Boolean = {
- p match {
- case WithCTE(plan, _) => this.canInsertLimit(plan, maxOutputRowsOpt)
- case _ => super.canInsertLimit(p, maxOutputRowsOpt)
- }
- }
-}
diff --git
a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala
b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala
index aec51cbd37..a392c96942 100644
---
a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala
@@ -22,7 +22,6 @@ import java.io.File
import scala.collection.JavaConverters._
import org.apache.commons.io.FileUtils
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException}
@@ -96,394 +95,6 @@ trait WatchDogSuiteBase extends KyuubiSparkSQLExtensionTest
{
}
}
- test("test watchdog: simple SELECT STATEMENT") {
-
- withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-
- List("", "ORDER BY c1", "ORDER BY c2").foreach { sort =>
- List("", " DISTINCT").foreach { distinct =>
- assert(sql(
- s"""
- |SELECT $distinct *
- |FROM t1
- |$sort
-
|""".stripMargin).queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
- }
- }
-
- limitAndExpecteds.foreach { case LimitAndExpected(limit, expected) =>
- List("", "ORDER BY c1", "ORDER BY c2").foreach { sort =>
- List("", "DISTINCT").foreach { distinct =>
- assert(sql(
- s"""
- |SELECT $distinct *
- |FROM t1
- |$sort
- |LIMIT $limit
-
|""".stripMargin).queryExecution.optimizedPlan.maxRows.contains(expected))
- }
- }
- }
- }
- }
-
- test("test watchdog: SELECT ... WITH AGGREGATE STATEMENT ") {
-
- withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-
- assert(!sql("SELECT count(*) FROM t1")
- .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-
- val sorts = List("", "ORDER BY cnt", "ORDER BY c1", "ORDER BY cnt, c1",
"ORDER BY c1, cnt")
- val havingConditions = List("", "HAVING cnt > 1")
-
- havingConditions.foreach { having =>
- sorts.foreach { sort =>
- assert(sql(
- s"""
- |SELECT c1, COUNT(*) as cnt
- |FROM t1
- |GROUP BY c1
- |$having
- |$sort
-
|""".stripMargin).queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
- }
- }
-
- limitAndExpecteds.foreach { case LimitAndExpected(limit, expected) =>
- havingConditions.foreach { having =>
- sorts.foreach { sort =>
- assert(sql(
- s"""
- |SELECT c1, COUNT(*) as cnt
- |FROM t1
- |GROUP BY c1
- |$having
- |$sort
- |LIMIT $limit
-
|""".stripMargin).queryExecution.optimizedPlan.maxRows.contains(expected))
- }
- }
- }
- }
- }
-
- test("test watchdog: SELECT with CTE forceMaxOutputRows") {
- // simple CTE
- val q1 =
- """
- |WITH t2 AS (
- | SELECT * FROM t1
- |)
- |""".stripMargin
-
- // nested CTE
- val q2 =
- """
- |WITH
- | t AS (SELECT * FROM t1),
- | t2 AS (
- | WITH t3 AS (SELECT * FROM t1)
- | SELECT * FROM t3
- | )
- |""".stripMargin
- withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-
- val sorts = List("", "ORDER BY c1", "ORDER BY c2")
-
- sorts.foreach { sort =>
- Seq(q1, q2).foreach { withQuery =>
- assert(sql(
- s"""
- |$withQuery
- |SELECT * FROM t2
- |$sort
-
|""".stripMargin).queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
- }
- }
-
- limitAndExpecteds.foreach { case LimitAndExpected(limit, expected) =>
- sorts.foreach { sort =>
- Seq(q1, q2).foreach { withQuery =>
- assert(sql(
- s"""
- |$withQuery
- |SELECT * FROM t2
- |$sort
- |LIMIT $limit
-
|""".stripMargin).queryExecution.optimizedPlan.maxRows.contains(expected))
- }
- }
- }
- }
- }
-
- test("test watchdog: SELECT AGGREGATE WITH CTE forceMaxOutputRows") {
-
- withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-
- assert(!sql(
- """
- |WITH custom_cte AS (
- |SELECT * FROM t1
- |)
- |
- |SELECT COUNT(*)
- |FROM custom_cte
- |""".stripMargin).queryExecution
- .analyzed.isInstanceOf[GlobalLimit])
-
- val sorts = List("", "ORDER BY cnt", "ORDER BY c1", "ORDER BY cnt, c1",
"ORDER BY c1, cnt")
- val havingConditions = List("", "HAVING cnt > 1")
-
- havingConditions.foreach { having =>
- sorts.foreach { sort =>
- assert(sql(
- s"""
- |WITH custom_cte AS (
- |SELECT * FROM t1
- |)
- |
- |SELECT c1, COUNT(*) as cnt
- |FROM custom_cte
- |GROUP BY c1
- |$having
- |$sort
-
|""".stripMargin).queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
- }
- }
-
- limitAndExpecteds.foreach { case LimitAndExpected(limit, expected) =>
- havingConditions.foreach { having =>
- sorts.foreach { sort =>
- assert(sql(
- s"""
- |WITH custom_cte AS (
- |SELECT * FROM t1
- |)
- |
- |SELECT c1, COUNT(*) as cnt
- |FROM custom_cte
- |GROUP BY c1
- |$having
- |$sort
- |LIMIT $limit
-
|""".stripMargin).queryExecution.optimizedPlan.maxRows.contains(expected))
- }
- }
- }
- }
- }
-
- test("test watchdog: UNION Statement for forceMaxOutputRows") {
-
- withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-
- List("", "ALL").foreach { x =>
- assert(sql(
- s"""
- |SELECT c1, c2 FROM t1
- |UNION $x
- |SELECT c1, c2 FROM t2
- |UNION $x
- |SELECT c1, c2 FROM t3
- |""".stripMargin)
- .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
- }
-
- val sorts = List("", "ORDER BY cnt", "ORDER BY c1", "ORDER BY cnt, c1",
"ORDER BY c1, cnt")
- val havingConditions = List("", "HAVING cnt > 1")
-
- List("", "ALL").foreach { x =>
- havingConditions.foreach { having =>
- sorts.foreach { sort =>
- assert(sql(
- s"""
- |SELECT c1, count(c2) as cnt
- |FROM t1
- |GROUP BY c1
- |$having
- |UNION $x
- |SELECT c1, COUNT(c2) as cnt
- |FROM t2
- |GROUP BY c1
- |$having
- |UNION $x
- |SELECT c1, COUNT(c2) as cnt
- |FROM t3
- |GROUP BY c1
- |$having
- |$sort
- |""".stripMargin)
- .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
- }
- }
- }
-
- limitAndExpecteds.foreach { case LimitAndExpected(limit, expected) =>
- assert(sql(
- s"""
- |SELECT c1, c2 FROM t1
- |UNION
- |SELECT c1, c2 FROM t2
- |UNION
- |SELECT c1, c2 FROM t3
- |LIMIT $limit
- |""".stripMargin)
- .queryExecution.optimizedPlan.maxRows.contains(expected))
- }
- }
- }
-
- test("test watchdog: Select View Statement for forceMaxOutputRows") {
- withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "3") {
- withTable("tmp_table", "tmp_union") {
- withView("tmp_view", "tmp_view2") {
- sql(s"create table tmp_table (a int, b int)")
- sql(s"insert into tmp_table values
(1,10),(2,20),(3,30),(4,40),(5,50)")
- sql(s"create table tmp_union (a int, b int)")
- sql(s"insert into tmp_union values
(6,60),(7,70),(8,80),(9,90),(10,100)")
- sql(s"create view tmp_view2 as select * from tmp_union")
- assert(!sql(
- s"""
- |CREATE VIEW tmp_view
- |as
- |SELECT * FROM
- |tmp_table
- |""".stripMargin)
- .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-
- assert(sql(
- s"""
- |SELECT * FROM
- |tmp_view
- |""".stripMargin)
- .queryExecution.optimizedPlan.maxRows.contains(3))
-
- assert(sql(
- s"""
- |SELECT * FROM
- |tmp_view
- |limit 11
- |""".stripMargin)
- .queryExecution.optimizedPlan.maxRows.contains(3))
-
- assert(sql(
- s"""
- |SELECT * FROM
- |(select * from tmp_view
- |UNION
- |select * from tmp_view2)
- |ORDER BY a
- |DESC
- |""".stripMargin)
- .collect().head.get(0) === 10)
- }
- }
- }
- }
-
- test("test watchdog: Insert Statement for forceMaxOutputRows") {
-
- withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
- withTable("tmp_table", "tmp_insert") {
- spark.sql(s"create table tmp_table (a int, b int)")
- spark.sql(s"insert into tmp_table values
(1,10),(2,20),(3,30),(4,40),(5,50)")
- val multiInsertTableName1: String = "tmp_tbl1"
- val multiInsertTableName2: String = "tmp_tbl2"
- sql(s"drop table if exists $multiInsertTableName1")
- sql(s"drop table if exists $multiInsertTableName2")
- sql(s"create table $multiInsertTableName1 like tmp_table")
- sql(s"create table $multiInsertTableName2 like tmp_table")
- assert(!sql(
- s"""
- |FROM tmp_table
- |insert into $multiInsertTableName1 select * limit 2
- |insert into $multiInsertTableName2 select *
- |""".stripMargin)
- .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
- }
- }
- }
-
- test("test watchdog: Distribute by for forceMaxOutputRows") {
-
- withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
- withTable("tmp_table") {
- spark.sql(s"create table tmp_table (a int, b int)")
- spark.sql(s"insert into tmp_table values
(1,10),(2,20),(3,30),(4,40),(5,50)")
- assert(sql(
- s"""
- |SELECT *
- |FROM tmp_table
- |DISTRIBUTE BY a
- |""".stripMargin)
- .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
- }
- }
- }
-
- test("test watchdog: Subquery for forceMaxOutputRows") {
- withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "1") {
- withTable("tmp_table1") {
- sql("CREATE TABLE spark_catalog.`default`.tmp_table1(KEY INT, VALUE
STRING) USING PARQUET")
- sql("INSERT INTO TABLE spark_catalog.`default`.tmp_table1 " +
- "VALUES (1, 'aa'),(2,'bb'),(3, 'cc'),(4,'aa'),(5,'cc'),(6, 'aa')")
- assert(
- sql("select * from
tmp_table1").queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
- val testSqlText =
- """
- |select count(*)
- |from tmp_table1
- |where tmp_table1.key in (
- |select distinct tmp_table1.key
- |from tmp_table1
- |where tmp_table1.value = "aa"
- |)
- |""".stripMargin
- val plan = sql(testSqlText).queryExecution.optimizedPlan
- assert(!findGlobalLimit(plan))
- checkAnswer(sql(testSqlText), Row(3) :: Nil)
- }
-
- def findGlobalLimit(plan: LogicalPlan): Boolean = plan match {
- case _: GlobalLimit => true
- case p if p.children.isEmpty => false
- case p => p.children.exists(findGlobalLimit)
- }
-
- }
- }
-
- test("test watchdog: Join for forceMaxOutputRows") {
- withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "1") {
- withTable("tmp_table1", "tmp_table2") {
- sql("CREATE TABLE spark_catalog.`default`.tmp_table1(KEY INT, VALUE
STRING) USING PARQUET")
- sql("INSERT INTO TABLE spark_catalog.`default`.tmp_table1 " +
- "VALUES (1, 'aa'),(2,'bb'),(3, 'cc'),(4,'aa'),(5,'cc'),(6, 'aa')")
- sql("CREATE TABLE spark_catalog.`default`.tmp_table2(KEY INT, VALUE
STRING) USING PARQUET")
- sql("INSERT INTO TABLE spark_catalog.`default`.tmp_table2 " +
- "VALUES (1, 'aa'),(2,'bb'),(3, 'cc'),(4,'aa'),(5,'cc'),(6, 'aa')")
- val testSqlText =
- """
- |select a.*,b.*
- |from tmp_table1 a
- |join
- |tmp_table2 b
- |on a.KEY = b.KEY
- |""".stripMargin
- val plan = sql(testSqlText).queryExecution.optimizedPlan
- assert(findGlobalLimit(plan))
- }
-
- def findGlobalLimit(plan: LogicalPlan): Boolean = plan match {
- case _: GlobalLimit => true
- case p if p.children.isEmpty => false
- case p => p.children.exists(findGlobalLimit)
- }
- }
- }
-
private def checkMaxFileSize(tableSize: Long, nonPartTableSize: Long): Unit
= {
withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key ->
tableSize.toString) {
checkAnswer(sql("SELECT count(distinct(p)) FROM test"), Row(10) :: Nil)
diff --git
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
index 4218c41fa1..9644537a06 100644
---
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
@@ -123,14 +123,6 @@ object KyuubiSQLConf {
.bytesConf(ByteUnit.BYTE)
.createOptional
- val WATCHDOG_FORCED_MAXOUTPUTROWS =
- buildConf("spark.sql.watchdog.forcedMaxOutputRows")
- .doc("Add ForcedMaxOutputRows rule to avoid huge output rows of
non-limit query " +
- "unexpectedly, it's optional that works with defined")
- .version("1.4.0")
- .intConf
- .createOptional
-
val DROP_IGNORE_NONEXISTENT =
buildConf("spark.sql.optimizer.dropIgnoreNonExistent")
.doc("Do not report an error if DROP
DATABASE/TABLE/VIEW/FUNCTION/PARTITION specifies " +
diff --git
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
index 06fd6291d0..db7f4b6ea3 100644
---
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.sql
import org.apache.spark.sql.{FinalStageResourceManager,
InjectCustomResourceProfile, SparkSessionExtensions}
-import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule,
KyuubiUnsupportedOperationsCheck, MaxScanStrategy}
+import org.apache.kyuubi.sql.watchdog.{KyuubiUnsupportedOperationsCheck,
MaxScanStrategy}
import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource,
InsertZorderBeforeWritingHive, ResolveZorder}
// scalastyle:off line.size.limit
@@ -47,7 +47,6 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions
=> Unit) {
// watchdog extension
extensions.injectCheckRule(_ => KyuubiUnsupportedOperationsCheck)
- extensions.injectOptimizerRule(ForcedMaxOutputRowsRule)
extensions.injectPlannerStrategy(MaxScanStrategy)
extensions.injectQueryStagePrepRule(_ => InsertShuffleNodeBeforeJoin)
diff --git
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsBase.scala
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsBase.scala
deleted file mode 100644
index dd329bd659..0000000000
---
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsBase.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.watchdog
-
-import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.expressions.Alias
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.command.DataWritingCommand
-
-import org.apache.kyuubi.sql.KyuubiSQLConf
-
-/*
- * Add ForcedMaxOutputRows rule for output rows limitation
- * to avoid huge output rows of non_limit query unexpectedly
- * mainly applied to cases as below:
- *
- * case 1:
- * {{{
- * SELECT [c1, c2, ...]
- * }}}
- *
- * case 2:
- * {{{
- * WITH CTE AS (
- * ...)
- * SELECT [c1, c2, ...] FROM CTE ...
- * }}}
- *
- * The Logical Rule add a GlobalLimit node before root project
- * */
-trait ForcedMaxOutputRowsBase extends Rule[LogicalPlan] {
-
- protected def isChildAggregate(a: Aggregate): Boolean
-
- protected def canInsertLimitInner(p: LogicalPlan): Boolean = p match {
- case Aggregate(_, Alias(_, "havingCondition") :: Nil, _) => false
- case agg: Aggregate => !isChildAggregate(agg)
- case _: RepartitionByExpression => true
- case _: Distinct => true
- case _: Filter => true
- case _: Project => true
- case Limit(_, _) => true
- case _: Sort => true
- case Union(children, _, _) =>
- if (children.exists(_.isInstanceOf[DataWritingCommand])) {
- false
- } else {
- true
- }
- case _: MultiInstanceRelation => true
- case _: Join => true
- case _ => false
- }
-
- protected def canInsertLimit(p: LogicalPlan, maxOutputRowsOpt: Option[Int]):
Boolean = {
- maxOutputRowsOpt match {
- case Some(forcedMaxOutputRows) if forcedMaxOutputRows >= 0 =>
- canInsertLimitInner(p) && !p.maxRows.exists(_ <= forcedMaxOutputRows)
- case _ => false
- }
- }
-
- override def apply(plan: LogicalPlan): LogicalPlan = {
- val maxOutputRowsOpt =
conf.getConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS)
- plan match {
- case p if p.resolved && canInsertLimit(p, maxOutputRowsOpt) =>
- Limit(
- maxOutputRowsOpt.get,
- plan)
- case _ => plan
- }
- }
-}
diff --git
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsRule.scala
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsRule.scala
deleted file mode 100644
index a3d990b109..0000000000
---
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsRule.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.watchdog
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CommandResult,
LogicalPlan, Union, WithCTE}
-import org.apache.spark.sql.execution.command.DataWritingCommand
-
-case class ForcedMaxOutputRowsRule(sparkSession: SparkSession) extends
ForcedMaxOutputRowsBase {
-
- override protected def isChildAggregate(a: Aggregate): Boolean = false
-
- override protected def canInsertLimitInner(p: LogicalPlan): Boolean = p
match {
- case WithCTE(plan, _) => this.canInsertLimitInner(plan)
- case plan: LogicalPlan => plan match {
- case Union(children, _, _) => !children.exists {
- case _: DataWritingCommand => true
- case p: CommandResult if
p.commandLogicalPlan.isInstanceOf[DataWritingCommand] => true
- case _ => false
- }
- case _ => super.canInsertLimitInner(plan)
- }
- }
-
- override protected def canInsertLimit(p: LogicalPlan, maxOutputRowsOpt:
Option[Int]): Boolean = {
- p match {
- case WithCTE(plan, _) => this.canInsertLimit(plan, maxOutputRowsOpt)
- case _ => super.canInsertLimit(p, maxOutputRowsOpt)
- }
- }
-}
diff --git
a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuite.scala
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuite.scala
index 87d18a2a44..0cf8a6a183 100644
---
a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuite.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuite.scala
@@ -22,7 +22,6 @@ import java.io.File
import scala.collection.JavaConverters._
import org.apache.commons.io.FileUtils
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException}
@@ -96,394 +95,6 @@ class WatchDogSuite extends KyuubiSparkSQLExtensionTest {
}
}
- test("test watchdog: simple SELECT STATEMENT") {
-
- withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-
- List("", "ORDER BY c1", "ORDER BY c2").foreach { sort =>
- List("", " DISTINCT").foreach { distinct =>
- assert(sql(
- s"""
- |SELECT $distinct *
- |FROM t1
- |$sort
-
|""".stripMargin).queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
- }
- }
-
- limitAndExpecteds.foreach { case LimitAndExpected(limit, expected) =>
- List("", "ORDER BY c1", "ORDER BY c2").foreach { sort =>
- List("", "DISTINCT").foreach { distinct =>
- assert(sql(
- s"""
- |SELECT $distinct *
- |FROM t1
- |$sort
- |LIMIT $limit
-
|""".stripMargin).queryExecution.optimizedPlan.maxRows.contains(expected))
- }
- }
- }
- }
- }
-
- test("test watchdog: SELECT ... WITH AGGREGATE STATEMENT ") {
-
- withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-
- assert(!sql("SELECT count(*) FROM t1")
- .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-
- val sorts = List("", "ORDER BY cnt", "ORDER BY c1", "ORDER BY cnt, c1",
"ORDER BY c1, cnt")
- val havingConditions = List("", "HAVING cnt > 1")
-
- havingConditions.foreach { having =>
- sorts.foreach { sort =>
- assert(sql(
- s"""
- |SELECT c1, COUNT(*) as cnt
- |FROM t1
- |GROUP BY c1
- |$having
- |$sort
-
|""".stripMargin).queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
- }
- }
-
- limitAndExpecteds.foreach { case LimitAndExpected(limit, expected) =>
- havingConditions.foreach { having =>
- sorts.foreach { sort =>
- assert(sql(
- s"""
- |SELECT c1, COUNT(*) as cnt
- |FROM t1
- |GROUP BY c1
- |$having
- |$sort
- |LIMIT $limit
-
|""".stripMargin).queryExecution.optimizedPlan.maxRows.contains(expected))
- }
- }
- }
- }
- }
-
- test("test watchdog: SELECT with CTE forceMaxOutputRows") {
- // simple CTE
- val q1 =
- """
- |WITH t2 AS (
- | SELECT * FROM t1
- |)
- |""".stripMargin
-
- // nested CTE
- val q2 =
- """
- |WITH
- | t AS (SELECT * FROM t1),
- | t2 AS (
- | WITH t3 AS (SELECT * FROM t1)
- | SELECT * FROM t3
- | )
- |""".stripMargin
- withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-
- val sorts = List("", "ORDER BY c1", "ORDER BY c2")
-
- sorts.foreach { sort =>
- Seq(q1, q2).foreach { withQuery =>
- assert(sql(
- s"""
- |$withQuery
- |SELECT * FROM t2
- |$sort
-
|""".stripMargin).queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
- }
- }
-
- limitAndExpecteds.foreach { case LimitAndExpected(limit, expected) =>
- sorts.foreach { sort =>
- Seq(q1, q2).foreach { withQuery =>
- assert(sql(
- s"""
- |$withQuery
- |SELECT * FROM t2
- |$sort
- |LIMIT $limit
-
|""".stripMargin).queryExecution.optimizedPlan.maxRows.contains(expected))
- }
- }
- }
- }
- }
-
- test("test watchdog: SELECT AGGREGATE WITH CTE forceMaxOutputRows") {
-
- withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-
- assert(!sql(
- """
- |WITH custom_cte AS (
- |SELECT * FROM t1
- |)
- |
- |SELECT COUNT(*)
- |FROM custom_cte
- |""".stripMargin).queryExecution
- .analyzed.isInstanceOf[GlobalLimit])
-
- val sorts = List("", "ORDER BY cnt", "ORDER BY c1", "ORDER BY cnt, c1",
"ORDER BY c1, cnt")
- val havingConditions = List("", "HAVING cnt > 1")
-
- havingConditions.foreach { having =>
- sorts.foreach { sort =>
- assert(sql(
- s"""
- |WITH custom_cte AS (
- |SELECT * FROM t1
- |)
- |
- |SELECT c1, COUNT(*) as cnt
- |FROM custom_cte
- |GROUP BY c1
- |$having
- |$sort
-
|""".stripMargin).queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
- }
- }
-
- limitAndExpecteds.foreach { case LimitAndExpected(limit, expected) =>
- havingConditions.foreach { having =>
- sorts.foreach { sort =>
- assert(sql(
- s"""
- |WITH custom_cte AS (
- |SELECT * FROM t1
- |)
- |
- |SELECT c1, COUNT(*) as cnt
- |FROM custom_cte
- |GROUP BY c1
- |$having
- |$sort
- |LIMIT $limit
-
|""".stripMargin).queryExecution.optimizedPlan.maxRows.contains(expected))
- }
- }
- }
- }
- }
-
- test("test watchdog: UNION Statement for forceMaxOutputRows") {
-
- withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-
- List("", "ALL").foreach { x =>
- assert(sql(
- s"""
- |SELECT c1, c2 FROM t1
- |UNION $x
- |SELECT c1, c2 FROM t2
- |UNION $x
- |SELECT c1, c2 FROM t3
- |""".stripMargin)
- .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
- }
-
- val sorts = List("", "ORDER BY cnt", "ORDER BY c1", "ORDER BY cnt, c1",
"ORDER BY c1, cnt")
- val havingConditions = List("", "HAVING cnt > 1")
-
- List("", "ALL").foreach { x =>
- havingConditions.foreach { having =>
- sorts.foreach { sort =>
- assert(sql(
- s"""
- |SELECT c1, count(c2) as cnt
- |FROM t1
- |GROUP BY c1
- |$having
- |UNION $x
- |SELECT c1, COUNT(c2) as cnt
- |FROM t2
- |GROUP BY c1
- |$having
- |UNION $x
- |SELECT c1, COUNT(c2) as cnt
- |FROM t3
- |GROUP BY c1
- |$having
- |$sort
- |""".stripMargin)
- .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
- }
- }
- }
-
- limitAndExpecteds.foreach { case LimitAndExpected(limit, expected) =>
- assert(sql(
- s"""
- |SELECT c1, c2 FROM t1
- |UNION
- |SELECT c1, c2 FROM t2
- |UNION
- |SELECT c1, c2 FROM t3
- |LIMIT $limit
- |""".stripMargin)
- .queryExecution.optimizedPlan.maxRows.contains(expected))
- }
- }
- }
-
- test("test watchdog: Select View Statement for forceMaxOutputRows") {
- withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "3") {
- withTable("tmp_table", "tmp_union") {
- withView("tmp_view", "tmp_view2") {
- sql(s"create table tmp_table (a int, b int)")
- sql(s"insert into tmp_table values
(1,10),(2,20),(3,30),(4,40),(5,50)")
- sql(s"create table tmp_union (a int, b int)")
- sql(s"insert into tmp_union values
(6,60),(7,70),(8,80),(9,90),(10,100)")
- sql(s"create view tmp_view2 as select * from tmp_union")
- assert(!sql(
- s"""
- |CREATE VIEW tmp_view
- |as
- |SELECT * FROM
- |tmp_table
- |""".stripMargin)
- .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-
- assert(sql(
- s"""
- |SELECT * FROM
- |tmp_view
- |""".stripMargin)
- .queryExecution.optimizedPlan.maxRows.contains(3))
-
- assert(sql(
- s"""
- |SELECT * FROM
- |tmp_view
- |limit 11
- |""".stripMargin)
- .queryExecution.optimizedPlan.maxRows.contains(3))
-
- assert(sql(
- s"""
- |SELECT * FROM
- |(select * from tmp_view
- |UNION
- |select * from tmp_view2)
- |ORDER BY a
- |DESC
- |""".stripMargin)
- .collect().head.get(0) === 10)
- }
- }
- }
- }
-
- test("test watchdog: Insert Statement for forceMaxOutputRows") {
-
- withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
- withTable("tmp_table", "tmp_insert") {
- spark.sql(s"create table tmp_table (a int, b int)")
- spark.sql(s"insert into tmp_table values
(1,10),(2,20),(3,30),(4,40),(5,50)")
- val multiInsertTableName1: String = "tmp_tbl1"
- val multiInsertTableName2: String = "tmp_tbl2"
- sql(s"drop table if exists $multiInsertTableName1")
- sql(s"drop table if exists $multiInsertTableName2")
- sql(s"create table $multiInsertTableName1 like tmp_table")
- sql(s"create table $multiInsertTableName2 like tmp_table")
- assert(!sql(
- s"""
- |FROM tmp_table
- |insert into $multiInsertTableName1 select * limit 2
- |insert into $multiInsertTableName2 select *
- |""".stripMargin)
- .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
- }
- }
- }
-
- test("test watchdog: Distribute by for forceMaxOutputRows") {
-
- withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
- withTable("tmp_table") {
- spark.sql(s"create table tmp_table (a int, b int)")
- spark.sql(s"insert into tmp_table values
(1,10),(2,20),(3,30),(4,40),(5,50)")
- assert(sql(
- s"""
- |SELECT *
- |FROM tmp_table
- |DISTRIBUTE BY a
- |""".stripMargin)
- .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
- }
- }
- }
-
- test("test watchdog: Subquery for forceMaxOutputRows") {
- withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "1") {
- withTable("tmp_table1") {
- sql("CREATE TABLE spark_catalog.`default`.tmp_table1(KEY INT, VALUE
STRING) USING PARQUET")
- sql("INSERT INTO TABLE spark_catalog.`default`.tmp_table1 " +
- "VALUES (1, 'aa'),(2,'bb'),(3, 'cc'),(4,'aa'),(5,'cc'),(6, 'aa')")
- assert(
- sql("select * from
tmp_table1").queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
- val testSqlText =
- """
- |select count(*)
- |from tmp_table1
- |where tmp_table1.key in (
- |select distinct tmp_table1.key
- |from tmp_table1
- |where tmp_table1.value = "aa"
- |)
- |""".stripMargin
- val plan = sql(testSqlText).queryExecution.optimizedPlan
- assert(!findGlobalLimit(plan))
- checkAnswer(sql(testSqlText), Row(3) :: Nil)
- }
-
- def findGlobalLimit(plan: LogicalPlan): Boolean = plan match {
- case _: GlobalLimit => true
- case p if p.children.isEmpty => false
- case p => p.children.exists(findGlobalLimit)
- }
-
- }
- }
-
- test("test watchdog: Join for forceMaxOutputRows") {
- withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "1") {
- withTable("tmp_table1", "tmp_table2") {
- sql("CREATE TABLE spark_catalog.`default`.tmp_table1(KEY INT, VALUE
STRING) USING PARQUET")
- sql("INSERT INTO TABLE spark_catalog.`default`.tmp_table1 " +
- "VALUES (1, 'aa'),(2,'bb'),(3, 'cc'),(4,'aa'),(5,'cc'),(6, 'aa')")
- sql("CREATE TABLE spark_catalog.`default`.tmp_table2(KEY INT, VALUE
STRING) USING PARQUET")
- sql("INSERT INTO TABLE spark_catalog.`default`.tmp_table2 " +
- "VALUES (1, 'aa'),(2,'bb'),(3, 'cc'),(4,'aa'),(5,'cc'),(6, 'aa')")
- val testSqlText =
- """
- |select a.*,b.*
- |from tmp_table1 a
- |join
- |tmp_table2 b
- |on a.KEY = b.KEY
- |""".stripMargin
- val plan = sql(testSqlText).queryExecution.optimizedPlan
- assert(findGlobalLimit(plan))
- }
-
- def findGlobalLimit(plan: LogicalPlan): Boolean = plan match {
- case _: GlobalLimit => true
- case p if p.children.isEmpty => false
- case p => p.children.exists(findGlobalLimit)
- }
- }
- }
-
private def checkMaxFileSize(tableSize: Long, nonPartTableSize: Long): Unit
= {
withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key ->
tableSize.toString) {
checkAnswer(sql("SELECT count(distinct(p)) FROM test"), Row(10) :: Nil)