This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new cc3ca56  [KYUUBI #1085] Add forcedMaxOutputRows rule for limitation to 
avoid huge output unexpectly
cc3ca56 is described below

commit cc3ca56ab23936b237b084eca508c7853fbadaeb
Author: h <[email protected]>
AuthorDate: Sat Sep 18 12:31:37 2021 +0800

    [KYUUBI #1085] Add forcedMaxOutputRows rule for limitation to avoid huge 
output unexpectly
    
    Add MaxOutputRows rule for output rows limitation to avoid huge output 
unexpectedly
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: 
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in 
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your 
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add
    <img width="1440" alt="截屏2021-09-12 下午12 19 28" 
src="https://user-images.githubusercontent.com/635169/132972063-b12937bb-807a-47bd-8d21-835d83031191.png";>
     '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    We support the PR feature with limitation that avoid huge output rows in 
user ad-hoc query unexpected,Generally ad-hoc query seems needle in a Haystack, 
user pick few computed result data in huge data from warehouse,  we mainly used 
in below cases:
    
    - CASE 1:
    ```
    SELECT [c1, c2, ...]
    ```
    - CASE 2:
    ```
    WITH CTE AS (...)
    SELECT [c1, c2, ...] FROM Express(CTE) ...
    ```
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [x] Add screenshots for manual tests if appropriate
    <img width="1440" alt="截屏2021-09-12 下午12 19 28" 
src="https://user-images.githubusercontent.com/635169/132972078-c4821135-0520-420d-9ab8-24e124f6c6c9.png";>
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #1085 from i7xh/watchdogMaxOutputRows.
    
    Closes #1085
    
    77aa6ff3 [h] Resolve issue: unify maxOutputRows and using unittest setupData
    939d2556 [h] Resolve issue: well-format code
    5b0383dd [h] Fix issue: Final Limit according to 
spark.sql.watchdog.forcedMaxOutputRows
    7dcbb0a4 [h] Resolve issue: remove origin match from rule
    f4dff4cf [h] Resolve issue: update forcedMaxOutputRows config doc
    ae21c1ac [h] Resovled issue: Support Aggregate force limitation and Remove 
InsertIntoDataSourceDirCommand process
    a9d3640b [h] Resolved code review issue
    01c87fd2 [h] Add MaxOutputRows rule for output rows limitation to avoid 
huge output rows of query unexpectedly
    
    Authored-by: h <[email protected]>
    Signed-off-by: ulysses-you <[email protected]>
---
 .../org/apache/kyuubi/sql/KyuubiSQLConf.scala      |  8 ++
 .../kyuubi/sql/KyuubiSparkSQLExtension.scala       |  5 +-
 .../sql/watchdog/ForcedMaxOutputRowsRule.scala     | 72 +++++++++++++++++
 .../apache/spark/sql/KyuubiExtensionSuite.scala    | 94 +++++++++++++++++++++-
 4 files changed, 176 insertions(+), 3 deletions(-)

diff --git 
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
 
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
index cb21bcd..57adc3f 100644
--- 
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
+++ 
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
@@ -94,4 +94,12 @@ object KyuubiSQLConf {
       .version("1.4.0")
       .intConf
       .createOptional
+
+  val WATCHDOG_FORCED_MAXOUTPUTROWS =
+    buildConf("spark.sql.watchdog.forcedMaxOutputRows")
+      .doc("Add ForcedMaxOutputRows rule to avoid huge output rows of 
non-limit query " +
+        "unexpectedly, it's optional that works with defined")
+      .version("1.4.0")
+      .intConf
+      .createOptional
 }
diff --git 
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
 
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
index 8110017..c11d65c 100644
--- 
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
+++ 
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
@@ -20,8 +20,8 @@ package org.apache.kyuubi.sql
 import org.apache.spark.sql.SparkSessionExtensions
 
 import org.apache.kyuubi.sql.sqlclassification.KyuubiSqlClassification
-import org.apache.kyuubi.sql.watchdog.MaxHivePartitionStrategy
-import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource, 
InsertZorderBeforeWritingHive, ResolveZorder, ZorderSparkSqlExtensionsParser}
+import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, 
MaxHivePartitionStrategy}
+import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource, 
InsertZorderBeforeWritingHive}
 import org.apache.kyuubi.sql.zorder.ResolveZorder
 import org.apache.kyuubi.sql.zorder.ZorderSparkSqlExtensionsParser
 
@@ -53,5 +53,6 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions 
=> Unit) {
     extensions.injectQueryStagePrepRule(_ => InsertShuffleNodeBeforeJoin)
     extensions.injectQueryStagePrepRule(FinalStageConfigIsolation(_))
     extensions.injectPlannerStrategy(MaxHivePartitionStrategy)
+    extensions.injectPostHocResolutionRule(ForcedMaxOutputRowsRule)
   }
 }
diff --git 
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsRule.scala
 
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsRule.scala
new file mode 100644
index 0000000..c2e3ee4
--- /dev/null
+++ 
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsRule.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.watchdog
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Limit, 
LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+
+/*
+* Add ForcedMaxOutputRows rule for output rows limitation
+* to avoid huge output rows of non_limit query unexpectedly
+* mainly applied to cases as below:
+*
+* case 1:
+* {{{
+*   SELECT [c1, c2, ...]
+* }}}
+*
+* case 2:
+* {{{
+*   WITH CTE AS (
+*   ...)
+* SELECT [c1, c2, ...] FROM CTE ...
+* }}}
+*
+* The Logical Rule add a GlobalLimit node before root project
+* */
+case class ForcedMaxOutputRowsRule(session: SparkSession) extends 
Rule[LogicalPlan] {
+
+  private def canInsertLimit(p: LogicalPlan, maxOutputRowsOpt: Option[Int]): 
Boolean = {
+
+    maxOutputRowsOpt match {
+      case Some(forcedMaxOutputRows) => val supported = p match {
+          case _: Project => true
+          case _: Aggregate => true
+          case Limit(_, _) => true
+          case _ => false
+        }
+        supported && !p.maxRows.exists(_ <= forcedMaxOutputRows)
+      case None => false
+    }
+
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    val maxOutputRowsOpt = 
conf.getConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS)
+    plan match {
+      case p if p.resolved && canInsertLimit(p, maxOutputRowsOpt) => Limit(
+        maxOutputRowsOpt.get, plan)
+      case _ => plan
+    }
+  }
+
+}
diff --git 
a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
 
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
index 85d17df..c94b33f 100644
--- 
a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
+++ 
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql
 import scala.collection.mutable.Set
 
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Multiply}
-import org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
+import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, 
RepartitionByExpression}
 import org.apache.spark.sql.execution.adaptive.{CustomShuffleReaderExec, 
QueryStageExec}
 import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, 
ShuffleExchangeLike}
 import org.apache.spark.sql.hive.HiveUtils
@@ -1303,4 +1303,96 @@ class KyuubiExtensionSuite extends 
KyuubiSparkSQLExtensionTest {
       }
     }
   }
+
+  test("test watchdog with query forceMaxOutputRows") {
+
+    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
+
+      assert(sql("SELECT * FROM t1")
+        .queryExecution.analyzed.isInstanceOf[GlobalLimit])
+
+      assert(sql("SELECT * FROM t1 LIMIT 1")
+        .queryExecution.analyzed.asInstanceOf[GlobalLimit].maxRows.contains(1))
+
+      assert(sql("SELECT * FROM t1 LIMIT 11")
+        
.queryExecution.analyzed.asInstanceOf[GlobalLimit].maxRows.contains(10))
+
+      assert(!sql("SELECT count(*) FROM t1")
+        .queryExecution.analyzed.isInstanceOf[GlobalLimit])
+
+      assert(sql(
+        """
+          |SELECT c1, COUNT(*)
+          |FROM t1
+          |GROUP BY c1
+          |""".stripMargin).queryExecution.analyzed.isInstanceOf[GlobalLimit])
+
+      assert(sql(
+        """
+          |WITH custom_cte AS (
+          |SELECT * FROM t1
+          |)
+          |
+          |SELECT * FROM custom_cte
+          |""".stripMargin).queryExecution
+        .analyzed.isInstanceOf[GlobalLimit])
+
+      assert(sql(
+        """
+          |WITH custom_cte AS (
+          |SELECT * FROM t1
+          |)
+          |
+          |SELECT * FROM custom_cte
+          |LIMIT 1
+          |""".stripMargin).queryExecution
+        .analyzed.asInstanceOf[GlobalLimit].maxRows.contains(1))
+
+      assert(sql(
+        """
+          |WITH custom_cte AS (
+          |SELECT * FROM t1
+          |)
+          |
+          |SELECT * FROM custom_cte
+          |LIMIT 11
+          |""".stripMargin).queryExecution
+        .analyzed.asInstanceOf[GlobalLimit].maxRows.contains(10))
+
+      assert(!sql(
+        """
+          |WITH custom_cte AS (
+          |SELECT * FROM t1
+          |)
+          |
+          |SELECT COUNT(*) FROM custom_cte
+          |""".stripMargin).queryExecution
+        .analyzed.isInstanceOf[GlobalLimit])
+
+      assert(sql(
+        """
+          |WITH custom_cte AS (
+          |SELECT * FROM t1
+          |)
+          |
+          |SELECT c1, COUNT(*)
+          |FROM custom_cte
+          |GROUP BY c1
+          |""".stripMargin).queryExecution
+        .analyzed.isInstanceOf[GlobalLimit])
+
+      assert(sql(
+        """
+          |WITH custom_cte AS (
+          |SELECT * FROM t1
+          |)
+          |
+          |SELECT c1, COUNT(*)
+          |FROM custom_cte
+          |GROUP BY c1
+          |LIMIT 11
+          |""".stripMargin).queryExecution
+        .analyzed.asInstanceOf[GlobalLimit].maxRows.contains(10))
+    }
+  }
 }

Reply via email to