This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new cc3ca56 [KYUUBI #1085] Add forcedMaxOutputRows rule for limitation to
avoid huge output unexpectly
cc3ca56 is described below
commit cc3ca56ab23936b237b084eca508c7853fbadaeb
Author: h <[email protected]>
AuthorDate: Sat Sep 18 12:31:37 2021 +0800
[KYUUBI #1085] Add forcedMaxOutputRows rule for limitation to avoid huge
output unexpectly
Add MaxOutputRows rule for output rows limitation to avoid huge output
unexpectedly
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add
<img width="1440" alt="截屏2021-09-12 下午12 19 28"
src="https://user-images.githubusercontent.com/635169/132972063-b12937bb-807a-47bd-8d21-835d83031191.png">
'[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
We support the PR feature with limitation that avoid huge output rows in
user ad-hoc query unexpected,Generally ad-hoc query seems needle in a Haystack,
user pick few computed result data in huge data from warehouse, we mainly used
in below cases:
- CASE 1:
```
SELECT [c1, c2, ...]
```
- CASE 2:
```
WITH CTE AS (...)
SELECT [c1, c2, ...] FROM Express(CTE) ...
```
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [x] Add screenshots for manual tests if appropriate
<img width="1440" alt="截屏2021-09-12 下午12 19 28"
src="https://user-images.githubusercontent.com/635169/132972078-c4821135-0520-420d-9ab8-24e124f6c6c9.png">
- [x] [Run
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1085 from i7xh/watchdogMaxOutputRows.
Closes #1085
77aa6ff3 [h] Resolve issue: unify maxOutputRows and using unittest setupData
939d2556 [h] Resolve issue: well-format code
5b0383dd [h] Fix issue: Final Limit according to
spark.sql.watchdog.forcedMaxOutputRows
7dcbb0a4 [h] Resolve issue: remove origin match from rule
f4dff4cf [h] Resolve issue: update forcedMaxOutputRows config doc
ae21c1ac [h] Resovled issue: Support Aggregate force limitation and Remove
InsertIntoDataSourceDirCommand process
a9d3640b [h] Resolved code review issue
01c87fd2 [h] Add MaxOutputRows rule for output rows limitation to avoid
huge output rows of query unexpectedly
Authored-by: h <[email protected]>
Signed-off-by: ulysses-you <[email protected]>
---
.../org/apache/kyuubi/sql/KyuubiSQLConf.scala | 8 ++
.../kyuubi/sql/KyuubiSparkSQLExtension.scala | 5 +-
.../sql/watchdog/ForcedMaxOutputRowsRule.scala | 72 +++++++++++++++++
.../apache/spark/sql/KyuubiExtensionSuite.scala | 94 +++++++++++++++++++++-
4 files changed, 176 insertions(+), 3 deletions(-)
diff --git
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
index cb21bcd..57adc3f 100644
---
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
+++
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
@@ -94,4 +94,12 @@ object KyuubiSQLConf {
.version("1.4.0")
.intConf
.createOptional
+
+ val WATCHDOG_FORCED_MAXOUTPUTROWS =
+ buildConf("spark.sql.watchdog.forcedMaxOutputRows")
+ .doc("Add ForcedMaxOutputRows rule to avoid huge output rows of
non-limit query " +
+ "unexpectedly, it's optional that works with defined")
+ .version("1.4.0")
+ .intConf
+ .createOptional
}
diff --git
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
index 8110017..c11d65c 100644
---
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
+++
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
@@ -20,8 +20,8 @@ package org.apache.kyuubi.sql
import org.apache.spark.sql.SparkSessionExtensions
import org.apache.kyuubi.sql.sqlclassification.KyuubiSqlClassification
-import org.apache.kyuubi.sql.watchdog.MaxHivePartitionStrategy
-import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource,
InsertZorderBeforeWritingHive, ResolveZorder, ZorderSparkSqlExtensionsParser}
+import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule,
MaxHivePartitionStrategy}
+import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource,
InsertZorderBeforeWritingHive}
import org.apache.kyuubi.sql.zorder.ResolveZorder
import org.apache.kyuubi.sql.zorder.ZorderSparkSqlExtensionsParser
@@ -53,5 +53,6 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions
=> Unit) {
extensions.injectQueryStagePrepRule(_ => InsertShuffleNodeBeforeJoin)
extensions.injectQueryStagePrepRule(FinalStageConfigIsolation(_))
extensions.injectPlannerStrategy(MaxHivePartitionStrategy)
+ extensions.injectPostHocResolutionRule(ForcedMaxOutputRowsRule)
}
}
diff --git
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsRule.scala
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsRule.scala
new file mode 100644
index 0000000..c2e3ee4
--- /dev/null
+++
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsRule.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.watchdog
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Limit,
LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+
+/*
+* Add ForcedMaxOutputRows rule for output rows limitation
+* to avoid huge output rows of non_limit query unexpectedly
+* mainly applied to cases as below:
+*
+* case 1:
+* {{{
+* SELECT [c1, c2, ...]
+* }}}
+*
+* case 2:
+* {{{
+* WITH CTE AS (
+* ...)
+* SELECT [c1, c2, ...] FROM CTE ...
+* }}}
+*
+* The Logical Rule add a GlobalLimit node before root project
+* */
+case class ForcedMaxOutputRowsRule(session: SparkSession) extends
Rule[LogicalPlan] {
+
+ private def canInsertLimit(p: LogicalPlan, maxOutputRowsOpt: Option[Int]):
Boolean = {
+
+ maxOutputRowsOpt match {
+ case Some(forcedMaxOutputRows) => val supported = p match {
+ case _: Project => true
+ case _: Aggregate => true
+ case Limit(_, _) => true
+ case _ => false
+ }
+ supported && !p.maxRows.exists(_ <= forcedMaxOutputRows)
+ case None => false
+ }
+
+ }
+
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ val maxOutputRowsOpt =
conf.getConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS)
+ plan match {
+ case p if p.resolved && canInsertLimit(p, maxOutputRowsOpt) => Limit(
+ maxOutputRowsOpt.get, plan)
+ case _ => plan
+ }
+ }
+
+}
diff --git
a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
index 85d17df..c94b33f 100644
---
a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
+++
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql
import scala.collection.mutable.Set
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Multiply}
-import org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
+import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit,
RepartitionByExpression}
import org.apache.spark.sql.execution.adaptive.{CustomShuffleReaderExec,
QueryStageExec}
import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS,
ShuffleExchangeLike}
import org.apache.spark.sql.hive.HiveUtils
@@ -1303,4 +1303,96 @@ class KyuubiExtensionSuite extends
KyuubiSparkSQLExtensionTest {
}
}
}
+
+ test("test watchdog with query forceMaxOutputRows") {
+
+ withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
+
+ assert(sql("SELECT * FROM t1")
+ .queryExecution.analyzed.isInstanceOf[GlobalLimit])
+
+ assert(sql("SELECT * FROM t1 LIMIT 1")
+ .queryExecution.analyzed.asInstanceOf[GlobalLimit].maxRows.contains(1))
+
+ assert(sql("SELECT * FROM t1 LIMIT 11")
+
.queryExecution.analyzed.asInstanceOf[GlobalLimit].maxRows.contains(10))
+
+ assert(!sql("SELECT count(*) FROM t1")
+ .queryExecution.analyzed.isInstanceOf[GlobalLimit])
+
+ assert(sql(
+ """
+ |SELECT c1, COUNT(*)
+ |FROM t1
+ |GROUP BY c1
+ |""".stripMargin).queryExecution.analyzed.isInstanceOf[GlobalLimit])
+
+ assert(sql(
+ """
+ |WITH custom_cte AS (
+ |SELECT * FROM t1
+ |)
+ |
+ |SELECT * FROM custom_cte
+ |""".stripMargin).queryExecution
+ .analyzed.isInstanceOf[GlobalLimit])
+
+ assert(sql(
+ """
+ |WITH custom_cte AS (
+ |SELECT * FROM t1
+ |)
+ |
+ |SELECT * FROM custom_cte
+ |LIMIT 1
+ |""".stripMargin).queryExecution
+ .analyzed.asInstanceOf[GlobalLimit].maxRows.contains(1))
+
+ assert(sql(
+ """
+ |WITH custom_cte AS (
+ |SELECT * FROM t1
+ |)
+ |
+ |SELECT * FROM custom_cte
+ |LIMIT 11
+ |""".stripMargin).queryExecution
+ .analyzed.asInstanceOf[GlobalLimit].maxRows.contains(10))
+
+ assert(!sql(
+ """
+ |WITH custom_cte AS (
+ |SELECT * FROM t1
+ |)
+ |
+ |SELECT COUNT(*) FROM custom_cte
+ |""".stripMargin).queryExecution
+ .analyzed.isInstanceOf[GlobalLimit])
+
+ assert(sql(
+ """
+ |WITH custom_cte AS (
+ |SELECT * FROM t1
+ |)
+ |
+ |SELECT c1, COUNT(*)
+ |FROM custom_cte
+ |GROUP BY c1
+ |""".stripMargin).queryExecution
+ .analyzed.isInstanceOf[GlobalLimit])
+
+ assert(sql(
+ """
+ |WITH custom_cte AS (
+ |SELECT * FROM t1
+ |)
+ |
+ |SELECT c1, COUNT(*)
+ |FROM custom_cte
+ |GROUP BY c1
+ |LIMIT 11
+ |""".stripMargin).queryExecution
+ .analyzed.asInstanceOf[GlobalLimit].maxRows.contains(10))
+ }
+ }
}