This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f4d7ca734 [KYUUBI #6983] Remove support for 
spark.sql.watchdog.forcedMaxOutputRows
3f4d7ca734 is described below

commit 3f4d7ca73470ee67d566f2953e2ca9831879ea5d
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Mar 17 16:02:27 2025 +0800

    [KYUUBI #6983] Remove support for spark.sql.watchdog.forcedMaxOutputRows
    
    ### Why are the changes needed?
    
    The feature `spark.sql.watchdog.forcedMaxOutputRows` is a little bit hacky, 
it's actually a manually implemented "limit pushdown", we already have a simple 
and more reliable way to achieve that by using 
`kyuubi.operation.result.max.rows`.
    
    ### How was this patch tested?
    
    Pass GHA.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #6983 from pan3793/rm-forcedMaxOutputRows.
    
    Closes #6983
    
    5e0707955 [Cheng Pan] Remove support for 
spark.sql.watchdog.forcedMaxOutputRows
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 docs/deployment/migration-guide.md                 |   4 +
 .../org/apache/kyuubi/sql/KyuubiSQLConf.scala      |   8 -
 .../kyuubi/sql/KyuubiSparkSQLExtension.scala       |   3 +-
 .../sql/watchdog/ForcedMaxOutputRowsBase.scala     |  90 -----
 .../sql/watchdog/ForcedMaxOutputRowsRule.scala     |  46 ---
 .../org/apache/spark/sql/WatchDogSuiteBase.scala   | 389 ---------------------
 .../org/apache/kyuubi/sql/KyuubiSQLConf.scala      |   8 -
 .../kyuubi/sql/KyuubiSparkSQLExtension.scala       |   3 +-
 .../sql/watchdog/ForcedMaxOutputRowsBase.scala     |  90 -----
 .../sql/watchdog/ForcedMaxOutputRowsRule.scala     |  46 ---
 .../org/apache/spark/sql/WatchDogSuiteBase.scala   | 389 ---------------------
 .../org/apache/kyuubi/sql/KyuubiSQLConf.scala      |   8 -
 .../kyuubi/sql/KyuubiSparkSQLExtension.scala       |   3 +-
 .../sql/watchdog/ForcedMaxOutputRowsBase.scala     |  90 -----
 .../sql/watchdog/ForcedMaxOutputRowsRule.scala     |  46 ---
 .../scala/org/apache/spark/sql/WatchDogSuite.scala | 389 ---------------------
 16 files changed, 7 insertions(+), 1605 deletions(-)

diff --git a/docs/deployment/migration-guide.md 
b/docs/deployment/migration-guide.md
index 6899359c8a..3570da71ff 100644
--- a/docs/deployment/migration-guide.md
+++ b/docs/deployment/migration-guide.md
@@ -17,6 +17,10 @@
 
 # Kyuubi Migration Guide
 
+## Upgrading from Kyuubi 1.10 to 1.11
+
+* Since Kyuubi 1.11, the configuration 
`spark.sql.watchdog.forcedMaxOutputRows` provided by Kyuubi Spark extension is 
removed, consider using `kyuubi.operation.result.max.rows` instead. Note, the 
latter works without requirement of installing Kyuubi Spark extension.
+
 ## Upgrading from Kyuubi 1.9 to 1.10
 
 * Since Kyuubi 1.10, `beeline` is deprecated and will be removed in the 
future, please use `kyuubi-beeline` instead.
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
index e25d8a6777..e72a6c0735 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
@@ -123,14 +123,6 @@ object KyuubiSQLConf {
       .bytesConf(ByteUnit.BYTE)
       .createOptional
 
-  val WATCHDOG_FORCED_MAXOUTPUTROWS =
-    buildConf("spark.sql.watchdog.forcedMaxOutputRows")
-      .doc("Add ForcedMaxOutputRows rule to avoid huge output rows of 
non-limit query " +
-        "unexpectedly, it's optional that works with defined")
-      .version("1.4.0")
-      .intConf
-      .createOptional
-
   val DROP_IGNORE_NONEXISTENT =
     buildConf("spark.sql.optimizer.dropIgnoreNonExistent")
       .doc("Do not report an error if DROP 
DATABASE/TABLE/VIEW/FUNCTION/PARTITION specifies " +
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
index fd11fb5f57..33ff3e3177 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.sql
 
 import org.apache.spark.sql.{FinalStageResourceManager, 
InjectCustomResourceProfile, SparkSessionExtensions}
 
-import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, 
KyuubiUnsupportedOperationsCheck, MaxScanStrategy}
+import org.apache.kyuubi.sql.watchdog.{KyuubiUnsupportedOperationsCheck, 
MaxScanStrategy}
 
 // scalastyle:off line.size.limit
 /**
@@ -38,7 +38,6 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions 
=> Unit) {
 
     // watchdog extension
     extensions.injectCheckRule(_ => KyuubiUnsupportedOperationsCheck)
-    extensions.injectOptimizerRule(ForcedMaxOutputRowsRule)
     extensions.injectPlannerStrategy(MaxScanStrategy)
 
     extensions.injectQueryStagePrepRule(FinalStageResourceManager(_))
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsBase.scala
 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsBase.scala
deleted file mode 100644
index dd329bd659..0000000000
--- 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsBase.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.watchdog
-
-import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.expressions.Alias
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.command.DataWritingCommand
-
-import org.apache.kyuubi.sql.KyuubiSQLConf
-
-/*
- * Add ForcedMaxOutputRows rule for output rows limitation
- * to avoid huge output rows of non_limit query unexpectedly
- * mainly applied to cases as below:
- *
- * case 1:
- * {{{
- *   SELECT [c1, c2, ...]
- * }}}
- *
- * case 2:
- * {{{
- *   WITH CTE AS (
- *   ...)
- * SELECT [c1, c2, ...] FROM CTE ...
- * }}}
- *
- * The Logical Rule add a GlobalLimit node before root project
- * */
-trait ForcedMaxOutputRowsBase extends Rule[LogicalPlan] {
-
-  protected def isChildAggregate(a: Aggregate): Boolean
-
-  protected def canInsertLimitInner(p: LogicalPlan): Boolean = p match {
-    case Aggregate(_, Alias(_, "havingCondition") :: Nil, _) => false
-    case agg: Aggregate => !isChildAggregate(agg)
-    case _: RepartitionByExpression => true
-    case _: Distinct => true
-    case _: Filter => true
-    case _: Project => true
-    case Limit(_, _) => true
-    case _: Sort => true
-    case Union(children, _, _) =>
-      if (children.exists(_.isInstanceOf[DataWritingCommand])) {
-        false
-      } else {
-        true
-      }
-    case _: MultiInstanceRelation => true
-    case _: Join => true
-    case _ => false
-  }
-
-  protected def canInsertLimit(p: LogicalPlan, maxOutputRowsOpt: Option[Int]): 
Boolean = {
-    maxOutputRowsOpt match {
-      case Some(forcedMaxOutputRows) if forcedMaxOutputRows >= 0 =>
-        canInsertLimitInner(p) && !p.maxRows.exists(_ <= forcedMaxOutputRows)
-      case _ => false
-    }
-  }
-
-  override def apply(plan: LogicalPlan): LogicalPlan = {
-    val maxOutputRowsOpt = 
conf.getConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS)
-    plan match {
-      case p if p.resolved && canInsertLimit(p, maxOutputRowsOpt) =>
-        Limit(
-          maxOutputRowsOpt.get,
-          plan)
-      case _ => plan
-    }
-  }
-}
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsRule.scala
 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsRule.scala
deleted file mode 100644
index a3d990b109..0000000000
--- 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsRule.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.watchdog
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CommandResult, 
LogicalPlan, Union, WithCTE}
-import org.apache.spark.sql.execution.command.DataWritingCommand
-
-case class ForcedMaxOutputRowsRule(sparkSession: SparkSession) extends 
ForcedMaxOutputRowsBase {
-
-  override protected def isChildAggregate(a: Aggregate): Boolean = false
-
-  override protected def canInsertLimitInner(p: LogicalPlan): Boolean = p 
match {
-    case WithCTE(plan, _) => this.canInsertLimitInner(plan)
-    case plan: LogicalPlan => plan match {
-        case Union(children, _, _) => !children.exists {
-            case _: DataWritingCommand => true
-            case p: CommandResult if 
p.commandLogicalPlan.isInstanceOf[DataWritingCommand] => true
-            case _ => false
-          }
-        case _ => super.canInsertLimitInner(plan)
-      }
-  }
-
-  override protected def canInsertLimit(p: LogicalPlan, maxOutputRowsOpt: 
Option[Int]): Boolean = {
-    p match {
-      case WithCTE(plan, _) => this.canInsertLimit(plan, maxOutputRowsOpt)
-      case _ => super.canInsertLimit(p, maxOutputRowsOpt)
-    }
-  }
-}
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala
 
b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala
index 22c998d3b0..b5f8c67261 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala
@@ -22,7 +22,6 @@ import java.io.File
 import scala.collection.JavaConverters._
 
 import org.apache.commons.io.FileUtils
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
 
 import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException}
@@ -96,394 +95,6 @@ trait WatchDogSuiteBase extends KyuubiSparkSQLExtensionTest 
{
     }
   }
 
-  test("test watchdog: simple SELECT STATEMENT") {
-
-    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-
-      List("", "ORDER BY c1", "ORDER BY c2").foreach { sort =>
-        List("", " DISTINCT").foreach { distinct =>
-          assert(sql(
-            s"""
-               |SELECT $distinct *
-               |FROM t1
-               |$sort
-               
|""".stripMargin).queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-        }
-      }
-
-      limitAndExpecteds.foreach { case LimitAndExpected(limit, expected) =>
-        List("", "ORDER BY c1", "ORDER BY c2").foreach { sort =>
-          List("", "DISTINCT").foreach { distinct =>
-            assert(sql(
-              s"""
-                 |SELECT $distinct *
-                 |FROM t1
-                 |$sort
-                 |LIMIT $limit
-                 
|""".stripMargin).queryExecution.optimizedPlan.maxRows.contains(expected))
-          }
-        }
-      }
-    }
-  }
-
-  test("test watchdog: SELECT ... WITH AGGREGATE STATEMENT ") {
-
-    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-
-      assert(!sql("SELECT count(*) FROM t1")
-        .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-
-      val sorts = List("", "ORDER BY cnt", "ORDER BY c1", "ORDER BY cnt, c1", 
"ORDER BY c1, cnt")
-      val havingConditions = List("", "HAVING cnt > 1")
-
-      havingConditions.foreach { having =>
-        sorts.foreach { sort =>
-          assert(sql(
-            s"""
-               |SELECT c1, COUNT(*) as cnt
-               |FROM t1
-               |GROUP BY c1
-               |$having
-               |$sort
-               
|""".stripMargin).queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-        }
-      }
-
-      limitAndExpecteds.foreach { case LimitAndExpected(limit, expected) =>
-        havingConditions.foreach { having =>
-          sorts.foreach { sort =>
-            assert(sql(
-              s"""
-                 |SELECT c1, COUNT(*) as cnt
-                 |FROM t1
-                 |GROUP BY c1
-                 |$having
-                 |$sort
-                 |LIMIT $limit
-                 
|""".stripMargin).queryExecution.optimizedPlan.maxRows.contains(expected))
-          }
-        }
-      }
-    }
-  }
-
-  test("test watchdog: SELECT with CTE forceMaxOutputRows") {
-    // simple CTE
-    val q1 =
-      """
-        |WITH t2 AS (
-        |    SELECT * FROM t1
-        |)
-        |""".stripMargin
-
-    // nested CTE
-    val q2 =
-      """
-        |WITH
-        |    t AS (SELECT * FROM t1),
-        |    t2 AS (
-        |        WITH t3 AS (SELECT * FROM t1)
-        |        SELECT * FROM t3
-        |    )
-        |""".stripMargin
-    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-
-      val sorts = List("", "ORDER BY c1", "ORDER BY c2")
-
-      sorts.foreach { sort =>
-        Seq(q1, q2).foreach { withQuery =>
-          assert(sql(
-            s"""
-               |$withQuery
-               |SELECT * FROM t2
-               |$sort
-               
|""".stripMargin).queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-        }
-      }
-
-      limitAndExpecteds.foreach { case LimitAndExpected(limit, expected) =>
-        sorts.foreach { sort =>
-          Seq(q1, q2).foreach { withQuery =>
-            assert(sql(
-              s"""
-                 |$withQuery
-                 |SELECT * FROM t2
-                 |$sort
-                 |LIMIT $limit
-                 
|""".stripMargin).queryExecution.optimizedPlan.maxRows.contains(expected))
-          }
-        }
-      }
-    }
-  }
-
-  test("test watchdog: SELECT AGGREGATE WITH CTE forceMaxOutputRows") {
-
-    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-
-      assert(!sql(
-        """
-          |WITH custom_cte AS (
-          |SELECT * FROM t1
-          |)
-          |
-          |SELECT COUNT(*)
-          |FROM custom_cte
-          |""".stripMargin).queryExecution
-        .analyzed.isInstanceOf[GlobalLimit])
-
-      val sorts = List("", "ORDER BY cnt", "ORDER BY c1", "ORDER BY cnt, c1", 
"ORDER BY c1, cnt")
-      val havingConditions = List("", "HAVING cnt > 1")
-
-      havingConditions.foreach { having =>
-        sorts.foreach { sort =>
-          assert(sql(
-            s"""
-               |WITH custom_cte AS (
-               |SELECT * FROM t1
-               |)
-               |
-               |SELECT c1, COUNT(*) as cnt
-               |FROM custom_cte
-               |GROUP BY c1
-               |$having
-               |$sort
-               
|""".stripMargin).queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-        }
-      }
-
-      limitAndExpecteds.foreach { case LimitAndExpected(limit, expected) =>
-        havingConditions.foreach { having =>
-          sorts.foreach { sort =>
-            assert(sql(
-              s"""
-                 |WITH custom_cte AS (
-                 |SELECT * FROM t1
-                 |)
-                 |
-                 |SELECT c1, COUNT(*) as cnt
-                 |FROM custom_cte
-                 |GROUP BY c1
-                 |$having
-                 |$sort
-                 |LIMIT $limit
-                 
|""".stripMargin).queryExecution.optimizedPlan.maxRows.contains(expected))
-          }
-        }
-      }
-    }
-  }
-
-  test("test watchdog: UNION Statement for forceMaxOutputRows") {
-
-    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-
-      List("", "ALL").foreach { x =>
-        assert(sql(
-          s"""
-             |SELECT c1, c2 FROM t1
-             |UNION $x
-             |SELECT c1, c2 FROM t2
-             |UNION $x
-             |SELECT c1, c2 FROM t3
-             |""".stripMargin)
-          .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-      }
-
-      val sorts = List("", "ORDER BY cnt", "ORDER BY c1", "ORDER BY cnt, c1", 
"ORDER BY c1, cnt")
-      val havingConditions = List("", "HAVING cnt > 1")
-
-      List("", "ALL").foreach { x =>
-        havingConditions.foreach { having =>
-          sorts.foreach { sort =>
-            assert(sql(
-              s"""
-                 |SELECT c1, count(c2) as cnt
-                 |FROM t1
-                 |GROUP BY c1
-                 |$having
-                 |UNION $x
-                 |SELECT c1, COUNT(c2) as cnt
-                 |FROM t2
-                 |GROUP BY c1
-                 |$having
-                 |UNION $x
-                 |SELECT c1, COUNT(c2) as cnt
-                 |FROM t3
-                 |GROUP BY c1
-                 |$having
-                 |$sort
-                 |""".stripMargin)
-              .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-          }
-        }
-      }
-
-      limitAndExpecteds.foreach { case LimitAndExpected(limit, expected) =>
-        assert(sql(
-          s"""
-             |SELECT c1, c2 FROM t1
-             |UNION
-             |SELECT c1, c2 FROM t2
-             |UNION
-             |SELECT c1, c2 FROM t3
-             |LIMIT $limit
-             |""".stripMargin)
-          .queryExecution.optimizedPlan.maxRows.contains(expected))
-      }
-    }
-  }
-
-  test("test watchdog: Select View Statement for forceMaxOutputRows") {
-    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "3") {
-      withTable("tmp_table", "tmp_union") {
-        withView("tmp_view", "tmp_view2") {
-          sql(s"create table tmp_table (a int, b int)")
-          sql(s"insert into tmp_table values 
(1,10),(2,20),(3,30),(4,40),(5,50)")
-          sql(s"create table tmp_union (a int, b int)")
-          sql(s"insert into tmp_union values 
(6,60),(7,70),(8,80),(9,90),(10,100)")
-          sql(s"create view tmp_view2 as select * from tmp_union")
-          assert(!sql(
-            s"""
-               |CREATE VIEW tmp_view
-               |as
-               |SELECT * FROM
-               |tmp_table
-               |""".stripMargin)
-            .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-
-          assert(sql(
-            s"""
-               |SELECT * FROM
-               |tmp_view
-               |""".stripMargin)
-            .queryExecution.optimizedPlan.maxRows.contains(3))
-
-          assert(sql(
-            s"""
-               |SELECT * FROM
-               |tmp_view
-               |limit 11
-               |""".stripMargin)
-            .queryExecution.optimizedPlan.maxRows.contains(3))
-
-          assert(sql(
-            s"""
-               |SELECT * FROM
-               |(select * from tmp_view
-               |UNION
-               |select * from tmp_view2)
-               |ORDER BY a
-               |DESC
-               |""".stripMargin)
-            .collect().head.get(0) === 10)
-        }
-      }
-    }
-  }
-
-  test("test watchdog: Insert Statement for forceMaxOutputRows") {
-
-    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-      withTable("tmp_table", "tmp_insert") {
-        spark.sql(s"create table tmp_table (a int, b int)")
-        spark.sql(s"insert into tmp_table values 
(1,10),(2,20),(3,30),(4,40),(5,50)")
-        val multiInsertTableName1: String = "tmp_tbl1"
-        val multiInsertTableName2: String = "tmp_tbl2"
-        sql(s"drop table if exists $multiInsertTableName1")
-        sql(s"drop table if exists $multiInsertTableName2")
-        sql(s"create table $multiInsertTableName1 like tmp_table")
-        sql(s"create table $multiInsertTableName2 like tmp_table")
-        assert(!sql(
-          s"""
-             |FROM tmp_table
-             |insert into $multiInsertTableName1 select * limit 2
-             |insert into $multiInsertTableName2 select *
-             |""".stripMargin)
-          .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-      }
-    }
-  }
-
-  test("test watchdog: Distribute by for forceMaxOutputRows") {
-
-    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-      withTable("tmp_table") {
-        spark.sql(s"create table tmp_table (a int, b int)")
-        spark.sql(s"insert into tmp_table values 
(1,10),(2,20),(3,30),(4,40),(5,50)")
-        assert(sql(
-          s"""
-             |SELECT *
-             |FROM tmp_table
-             |DISTRIBUTE BY a
-             |""".stripMargin)
-          .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-      }
-    }
-  }
-
-  test("test watchdog: Subquery for forceMaxOutputRows") {
-    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "1") {
-      withTable("tmp_table1") {
-        sql("CREATE TABLE spark_catalog.`default`.tmp_table1(KEY INT, VALUE 
STRING) USING PARQUET")
-        sql("INSERT INTO TABLE spark_catalog.`default`.tmp_table1 " +
-          "VALUES (1, 'aa'),(2,'bb'),(3, 'cc'),(4,'aa'),(5,'cc'),(6, 'aa')")
-        assert(
-          sql("select * from 
tmp_table1").queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-        val testSqlText =
-          """
-            |select count(*)
-            |from tmp_table1
-            |where tmp_table1.key in (
-            |select distinct tmp_table1.key
-            |from tmp_table1
-            |where tmp_table1.value = "aa"
-            |)
-            |""".stripMargin
-        val plan = sql(testSqlText).queryExecution.optimizedPlan
-        assert(!findGlobalLimit(plan))
-        checkAnswer(sql(testSqlText), Row(3) :: Nil)
-      }
-
-      def findGlobalLimit(plan: LogicalPlan): Boolean = plan match {
-        case _: GlobalLimit => true
-        case p if p.children.isEmpty => false
-        case p => p.children.exists(findGlobalLimit)
-      }
-
-    }
-  }
-
-  test("test watchdog: Join for forceMaxOutputRows") {
-    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "1") {
-      withTable("tmp_table1", "tmp_table2") {
-        sql("CREATE TABLE spark_catalog.`default`.tmp_table1(KEY INT, VALUE 
STRING) USING PARQUET")
-        sql("INSERT INTO TABLE spark_catalog.`default`.tmp_table1 " +
-          "VALUES (1, 'aa'),(2,'bb'),(3, 'cc'),(4,'aa'),(5,'cc'),(6, 'aa')")
-        sql("CREATE TABLE spark_catalog.`default`.tmp_table2(KEY INT, VALUE 
STRING) USING PARQUET")
-        sql("INSERT INTO TABLE spark_catalog.`default`.tmp_table2 " +
-          "VALUES (1, 'aa'),(2,'bb'),(3, 'cc'),(4,'aa'),(5,'cc'),(6, 'aa')")
-        val testSqlText =
-          """
-            |select a.*,b.*
-            |from tmp_table1 a
-            |join
-            |tmp_table2 b
-            |on a.KEY = b.KEY
-            |""".stripMargin
-        val plan = sql(testSqlText).queryExecution.optimizedPlan
-        assert(findGlobalLimit(plan))
-      }
-
-      def findGlobalLimit(plan: LogicalPlan): Boolean = plan match {
-        case _: GlobalLimit => true
-        case p if p.children.isEmpty => false
-        case p => p.children.exists(findGlobalLimit)
-      }
-    }
-  }
-
   private def checkMaxFileSize(tableSize: Long, nonPartTableSize: Long): Unit 
= {
     withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> 
tableSize.toString) {
       checkAnswer(sql("SELECT count(distinct(p)) FROM test"), Row(10) :: Nil)
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
index e25d8a6777..e72a6c0735 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
@@ -123,14 +123,6 @@ object KyuubiSQLConf {
       .bytesConf(ByteUnit.BYTE)
       .createOptional
 
-  val WATCHDOG_FORCED_MAXOUTPUTROWS =
-    buildConf("spark.sql.watchdog.forcedMaxOutputRows")
-      .doc("Add ForcedMaxOutputRows rule to avoid huge output rows of 
non-limit query " +
-        "unexpectedly, it's optional that works with defined")
-      .version("1.4.0")
-      .intConf
-      .createOptional
-
   val DROP_IGNORE_NONEXISTENT =
     buildConf("spark.sql.optimizer.dropIgnoreNonExistent")
       .doc("Do not report an error if DROP 
DATABASE/TABLE/VIEW/FUNCTION/PARTITION specifies " +
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
index fd11fb5f57..33ff3e3177 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.sql
 
 import org.apache.spark.sql.{FinalStageResourceManager, 
InjectCustomResourceProfile, SparkSessionExtensions}
 
-import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, 
KyuubiUnsupportedOperationsCheck, MaxScanStrategy}
+import org.apache.kyuubi.sql.watchdog.{KyuubiUnsupportedOperationsCheck, 
MaxScanStrategy}
 
 // scalastyle:off line.size.limit
 /**
@@ -38,7 +38,6 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions 
=> Unit) {
 
     // watchdog extension
     extensions.injectCheckRule(_ => KyuubiUnsupportedOperationsCheck)
-    extensions.injectOptimizerRule(ForcedMaxOutputRowsRule)
     extensions.injectPlannerStrategy(MaxScanStrategy)
 
     extensions.injectQueryStagePrepRule(FinalStageResourceManager(_))
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsBase.scala
 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsBase.scala
deleted file mode 100644
index dd329bd659..0000000000
--- 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsBase.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.watchdog
-
-import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.expressions.Alias
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.command.DataWritingCommand
-
-import org.apache.kyuubi.sql.KyuubiSQLConf
-
-/*
- * Add ForcedMaxOutputRows rule for output rows limitation
- * to avoid huge output rows of non_limit query unexpectedly
- * mainly applied to cases as below:
- *
- * case 1:
- * {{{
- *   SELECT [c1, c2, ...]
- * }}}
- *
- * case 2:
- * {{{
- *   WITH CTE AS (
- *   ...)
- * SELECT [c1, c2, ...] FROM CTE ...
- * }}}
- *
- * The Logical Rule add a GlobalLimit node before root project
- * */
-trait ForcedMaxOutputRowsBase extends Rule[LogicalPlan] {
-
-  protected def isChildAggregate(a: Aggregate): Boolean
-
-  protected def canInsertLimitInner(p: LogicalPlan): Boolean = p match {
-    case Aggregate(_, Alias(_, "havingCondition") :: Nil, _) => false
-    case agg: Aggregate => !isChildAggregate(agg)
-    case _: RepartitionByExpression => true
-    case _: Distinct => true
-    case _: Filter => true
-    case _: Project => true
-    case Limit(_, _) => true
-    case _: Sort => true
-    case Union(children, _, _) =>
-      if (children.exists(_.isInstanceOf[DataWritingCommand])) {
-        false
-      } else {
-        true
-      }
-    case _: MultiInstanceRelation => true
-    case _: Join => true
-    case _ => false
-  }
-
-  protected def canInsertLimit(p: LogicalPlan, maxOutputRowsOpt: Option[Int]): 
Boolean = {
-    maxOutputRowsOpt match {
-      case Some(forcedMaxOutputRows) if forcedMaxOutputRows >= 0 =>
-        canInsertLimitInner(p) && !p.maxRows.exists(_ <= forcedMaxOutputRows)
-      case _ => false
-    }
-  }
-
-  override def apply(plan: LogicalPlan): LogicalPlan = {
-    val maxOutputRowsOpt = 
conf.getConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS)
-    plan match {
-      case p if p.resolved && canInsertLimit(p, maxOutputRowsOpt) =>
-        Limit(
-          maxOutputRowsOpt.get,
-          plan)
-      case _ => plan
-    }
-  }
-}
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsRule.scala
 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsRule.scala
deleted file mode 100644
index a3d990b109..0000000000
--- 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsRule.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.watchdog
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CommandResult, 
LogicalPlan, Union, WithCTE}
-import org.apache.spark.sql.execution.command.DataWritingCommand
-
-case class ForcedMaxOutputRowsRule(sparkSession: SparkSession) extends 
ForcedMaxOutputRowsBase {
-
-  override protected def isChildAggregate(a: Aggregate): Boolean = false
-
-  override protected def canInsertLimitInner(p: LogicalPlan): Boolean = p 
match {
-    case WithCTE(plan, _) => this.canInsertLimitInner(plan)
-    case plan: LogicalPlan => plan match {
-        case Union(children, _, _) => !children.exists {
-            case _: DataWritingCommand => true
-            case p: CommandResult if 
p.commandLogicalPlan.isInstanceOf[DataWritingCommand] => true
-            case _ => false
-          }
-        case _ => super.canInsertLimitInner(plan)
-      }
-  }
-
-  override protected def canInsertLimit(p: LogicalPlan, maxOutputRowsOpt: 
Option[Int]): Boolean = {
-    p match {
-      case WithCTE(plan, _) => this.canInsertLimit(plan, maxOutputRowsOpt)
-      case _ => super.canInsertLimit(p, maxOutputRowsOpt)
-    }
-  }
-}
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala
 
b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala
index aec51cbd37..a392c96942 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala
@@ -22,7 +22,6 @@ import java.io.File
 import scala.collection.JavaConverters._
 
 import org.apache.commons.io.FileUtils
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
 
 import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException}
@@ -96,394 +95,6 @@ trait WatchDogSuiteBase extends KyuubiSparkSQLExtensionTest 
{
     }
   }
 
-  test("test watchdog: simple SELECT STATEMENT") {
-
-    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-
-      List("", "ORDER BY c1", "ORDER BY c2").foreach { sort =>
-        List("", " DISTINCT").foreach { distinct =>
-          assert(sql(
-            s"""
-               |SELECT $distinct *
-               |FROM t1
-               |$sort
-               
|""".stripMargin).queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-        }
-      }
-
-      limitAndExpecteds.foreach { case LimitAndExpected(limit, expected) =>
-        List("", "ORDER BY c1", "ORDER BY c2").foreach { sort =>
-          List("", "DISTINCT").foreach { distinct =>
-            assert(sql(
-              s"""
-                 |SELECT $distinct *
-                 |FROM t1
-                 |$sort
-                 |LIMIT $limit
-                 
|""".stripMargin).queryExecution.optimizedPlan.maxRows.contains(expected))
-          }
-        }
-      }
-    }
-  }
-
-  test("test watchdog: SELECT ... WITH AGGREGATE STATEMENT ") {
-
-    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-
-      assert(!sql("SELECT count(*) FROM t1")
-        .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-
-      val sorts = List("", "ORDER BY cnt", "ORDER BY c1", "ORDER BY cnt, c1", 
"ORDER BY c1, cnt")
-      val havingConditions = List("", "HAVING cnt > 1")
-
-      havingConditions.foreach { having =>
-        sorts.foreach { sort =>
-          assert(sql(
-            s"""
-               |SELECT c1, COUNT(*) as cnt
-               |FROM t1
-               |GROUP BY c1
-               |$having
-               |$sort
-               
|""".stripMargin).queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-        }
-      }
-
-      limitAndExpecteds.foreach { case LimitAndExpected(limit, expected) =>
-        havingConditions.foreach { having =>
-          sorts.foreach { sort =>
-            assert(sql(
-              s"""
-                 |SELECT c1, COUNT(*) as cnt
-                 |FROM t1
-                 |GROUP BY c1
-                 |$having
-                 |$sort
-                 |LIMIT $limit
-                 
|""".stripMargin).queryExecution.optimizedPlan.maxRows.contains(expected))
-          }
-        }
-      }
-    }
-  }
-
-  test("test watchdog: SELECT with CTE forceMaxOutputRows") {
-    // simple CTE
-    val q1 =
-      """
-        |WITH t2 AS (
-        |    SELECT * FROM t1
-        |)
-        |""".stripMargin
-
-    // nested CTE
-    val q2 =
-      """
-        |WITH
-        |    t AS (SELECT * FROM t1),
-        |    t2 AS (
-        |        WITH t3 AS (SELECT * FROM t1)
-        |        SELECT * FROM t3
-        |    )
-        |""".stripMargin
-    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-
-      val sorts = List("", "ORDER BY c1", "ORDER BY c2")
-
-      sorts.foreach { sort =>
-        Seq(q1, q2).foreach { withQuery =>
-          assert(sql(
-            s"""
-               |$withQuery
-               |SELECT * FROM t2
-               |$sort
-               
|""".stripMargin).queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-        }
-      }
-
-      limitAndExpecteds.foreach { case LimitAndExpected(limit, expected) =>
-        sorts.foreach { sort =>
-          Seq(q1, q2).foreach { withQuery =>
-            assert(sql(
-              s"""
-                 |$withQuery
-                 |SELECT * FROM t2
-                 |$sort
-                 |LIMIT $limit
-                 
|""".stripMargin).queryExecution.optimizedPlan.maxRows.contains(expected))
-          }
-        }
-      }
-    }
-  }
-
-  test("test watchdog: SELECT AGGREGATE WITH CTE forceMaxOutputRows") {
-
-    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-
-      assert(!sql(
-        """
-          |WITH custom_cte AS (
-          |SELECT * FROM t1
-          |)
-          |
-          |SELECT COUNT(*)
-          |FROM custom_cte
-          |""".stripMargin).queryExecution
-        .analyzed.isInstanceOf[GlobalLimit])
-
-      val sorts = List("", "ORDER BY cnt", "ORDER BY c1", "ORDER BY cnt, c1", 
"ORDER BY c1, cnt")
-      val havingConditions = List("", "HAVING cnt > 1")
-
-      havingConditions.foreach { having =>
-        sorts.foreach { sort =>
-          assert(sql(
-            s"""
-               |WITH custom_cte AS (
-               |SELECT * FROM t1
-               |)
-               |
-               |SELECT c1, COUNT(*) as cnt
-               |FROM custom_cte
-               |GROUP BY c1
-               |$having
-               |$sort
-               
|""".stripMargin).queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-        }
-      }
-
-      limitAndExpecteds.foreach { case LimitAndExpected(limit, expected) =>
-        havingConditions.foreach { having =>
-          sorts.foreach { sort =>
-            assert(sql(
-              s"""
-                 |WITH custom_cte AS (
-                 |SELECT * FROM t1
-                 |)
-                 |
-                 |SELECT c1, COUNT(*) as cnt
-                 |FROM custom_cte
-                 |GROUP BY c1
-                 |$having
-                 |$sort
-                 |LIMIT $limit
-                 
|""".stripMargin).queryExecution.optimizedPlan.maxRows.contains(expected))
-          }
-        }
-      }
-    }
-  }
-
-  test("test watchdog: UNION Statement for forceMaxOutputRows") {
-
-    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-
-      List("", "ALL").foreach { x =>
-        assert(sql(
-          s"""
-             |SELECT c1, c2 FROM t1
-             |UNION $x
-             |SELECT c1, c2 FROM t2
-             |UNION $x
-             |SELECT c1, c2 FROM t3
-             |""".stripMargin)
-          .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-      }
-
-      val sorts = List("", "ORDER BY cnt", "ORDER BY c1", "ORDER BY cnt, c1", 
"ORDER BY c1, cnt")
-      val havingConditions = List("", "HAVING cnt > 1")
-
-      List("", "ALL").foreach { x =>
-        havingConditions.foreach { having =>
-          sorts.foreach { sort =>
-            assert(sql(
-              s"""
-                 |SELECT c1, count(c2) as cnt
-                 |FROM t1
-                 |GROUP BY c1
-                 |$having
-                 |UNION $x
-                 |SELECT c1, COUNT(c2) as cnt
-                 |FROM t2
-                 |GROUP BY c1
-                 |$having
-                 |UNION $x
-                 |SELECT c1, COUNT(c2) as cnt
-                 |FROM t3
-                 |GROUP BY c1
-                 |$having
-                 |$sort
-                 |""".stripMargin)
-              .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-          }
-        }
-      }
-
-      limitAndExpecteds.foreach { case LimitAndExpected(limit, expected) =>
-        assert(sql(
-          s"""
-             |SELECT c1, c2 FROM t1
-             |UNION
-             |SELECT c1, c2 FROM t2
-             |UNION
-             |SELECT c1, c2 FROM t3
-             |LIMIT $limit
-             |""".stripMargin)
-          .queryExecution.optimizedPlan.maxRows.contains(expected))
-      }
-    }
-  }
-
-  test("test watchdog: Select View Statement for forceMaxOutputRows") {
-    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "3") {
-      withTable("tmp_table", "tmp_union") {
-        withView("tmp_view", "tmp_view2") {
-          sql(s"create table tmp_table (a int, b int)")
-          sql(s"insert into tmp_table values 
(1,10),(2,20),(3,30),(4,40),(5,50)")
-          sql(s"create table tmp_union (a int, b int)")
-          sql(s"insert into tmp_union values 
(6,60),(7,70),(8,80),(9,90),(10,100)")
-          sql(s"create view tmp_view2 as select * from tmp_union")
-          assert(!sql(
-            s"""
-               |CREATE VIEW tmp_view
-               |as
-               |SELECT * FROM
-               |tmp_table
-               |""".stripMargin)
-            .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-
-          assert(sql(
-            s"""
-               |SELECT * FROM
-               |tmp_view
-               |""".stripMargin)
-            .queryExecution.optimizedPlan.maxRows.contains(3))
-
-          assert(sql(
-            s"""
-               |SELECT * FROM
-               |tmp_view
-               |limit 11
-               |""".stripMargin)
-            .queryExecution.optimizedPlan.maxRows.contains(3))
-
-          assert(sql(
-            s"""
-               |SELECT * FROM
-               |(select * from tmp_view
-               |UNION
-               |select * from tmp_view2)
-               |ORDER BY a
-               |DESC
-               |""".stripMargin)
-            .collect().head.get(0) === 10)
-        }
-      }
-    }
-  }
-
-  test("test watchdog: Insert Statement for forceMaxOutputRows") {
-
-    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-      withTable("tmp_table", "tmp_insert") {
-        spark.sql(s"create table tmp_table (a int, b int)")
-        spark.sql(s"insert into tmp_table values 
(1,10),(2,20),(3,30),(4,40),(5,50)")
-        val multiInsertTableName1: String = "tmp_tbl1"
-        val multiInsertTableName2: String = "tmp_tbl2"
-        sql(s"drop table if exists $multiInsertTableName1")
-        sql(s"drop table if exists $multiInsertTableName2")
-        sql(s"create table $multiInsertTableName1 like tmp_table")
-        sql(s"create table $multiInsertTableName2 like tmp_table")
-        assert(!sql(
-          s"""
-             |FROM tmp_table
-             |insert into $multiInsertTableName1 select * limit 2
-             |insert into $multiInsertTableName2 select *
-             |""".stripMargin)
-          .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-      }
-    }
-  }
-
-  test("test watchdog: Distribute by for forceMaxOutputRows") {
-
-    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-      withTable("tmp_table") {
-        spark.sql(s"create table tmp_table (a int, b int)")
-        spark.sql(s"insert into tmp_table values 
(1,10),(2,20),(3,30),(4,40),(5,50)")
-        assert(sql(
-          s"""
-             |SELECT *
-             |FROM tmp_table
-             |DISTRIBUTE BY a
-             |""".stripMargin)
-          .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-      }
-    }
-  }
-
-  test("test watchdog: Subquery for forceMaxOutputRows") {
-    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "1") {
-      withTable("tmp_table1") {
-        sql("CREATE TABLE spark_catalog.`default`.tmp_table1(KEY INT, VALUE 
STRING) USING PARQUET")
-        sql("INSERT INTO TABLE spark_catalog.`default`.tmp_table1 " +
-          "VALUES (1, 'aa'),(2,'bb'),(3, 'cc'),(4,'aa'),(5,'cc'),(6, 'aa')")
-        assert(
-          sql("select * from 
tmp_table1").queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-        val testSqlText =
-          """
-            |select count(*)
-            |from tmp_table1
-            |where tmp_table1.key in (
-            |select distinct tmp_table1.key
-            |from tmp_table1
-            |where tmp_table1.value = "aa"
-            |)
-            |""".stripMargin
-        val plan = sql(testSqlText).queryExecution.optimizedPlan
-        assert(!findGlobalLimit(plan))
-        checkAnswer(sql(testSqlText), Row(3) :: Nil)
-      }
-
-      def findGlobalLimit(plan: LogicalPlan): Boolean = plan match {
-        case _: GlobalLimit => true
-        case p if p.children.isEmpty => false
-        case p => p.children.exists(findGlobalLimit)
-      }
-
-    }
-  }
-
-  test("test watchdog: Join for forceMaxOutputRows") {
-    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "1") {
-      withTable("tmp_table1", "tmp_table2") {
-        sql("CREATE TABLE spark_catalog.`default`.tmp_table1(KEY INT, VALUE 
STRING) USING PARQUET")
-        sql("INSERT INTO TABLE spark_catalog.`default`.tmp_table1 " +
-          "VALUES (1, 'aa'),(2,'bb'),(3, 'cc'),(4,'aa'),(5,'cc'),(6, 'aa')")
-        sql("CREATE TABLE spark_catalog.`default`.tmp_table2(KEY INT, VALUE 
STRING) USING PARQUET")
-        sql("INSERT INTO TABLE spark_catalog.`default`.tmp_table2 " +
-          "VALUES (1, 'aa'),(2,'bb'),(3, 'cc'),(4,'aa'),(5,'cc'),(6, 'aa')")
-        val testSqlText =
-          """
-            |select a.*,b.*
-            |from tmp_table1 a
-            |join
-            |tmp_table2 b
-            |on a.KEY = b.KEY
-            |""".stripMargin
-        val plan = sql(testSqlText).queryExecution.optimizedPlan
-        assert(findGlobalLimit(plan))
-      }
-
-      def findGlobalLimit(plan: LogicalPlan): Boolean = plan match {
-        case _: GlobalLimit => true
-        case p if p.children.isEmpty => false
-        case p => p.children.exists(findGlobalLimit)
-      }
-    }
-  }
-
   private def checkMaxFileSize(tableSize: Long, nonPartTableSize: Long): Unit 
= {
     withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> 
tableSize.toString) {
       checkAnswer(sql("SELECT count(distinct(p)) FROM test"), Row(10) :: Nil)
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
index 4218c41fa1..9644537a06 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
@@ -123,14 +123,6 @@ object KyuubiSQLConf {
       .bytesConf(ByteUnit.BYTE)
       .createOptional
 
-  val WATCHDOG_FORCED_MAXOUTPUTROWS =
-    buildConf("spark.sql.watchdog.forcedMaxOutputRows")
-      .doc("Add ForcedMaxOutputRows rule to avoid huge output rows of 
non-limit query " +
-        "unexpectedly, it's optional that works with defined")
-      .version("1.4.0")
-      .intConf
-      .createOptional
-
   val DROP_IGNORE_NONEXISTENT =
     buildConf("spark.sql.optimizer.dropIgnoreNonExistent")
       .doc("Do not report an error if DROP 
DATABASE/TABLE/VIEW/FUNCTION/PARTITION specifies " +
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
index 06fd6291d0..db7f4b6ea3 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.sql
 
 import org.apache.spark.sql.{FinalStageResourceManager, 
InjectCustomResourceProfile, SparkSessionExtensions}
 
-import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, 
KyuubiUnsupportedOperationsCheck, MaxScanStrategy}
+import org.apache.kyuubi.sql.watchdog.{KyuubiUnsupportedOperationsCheck, 
MaxScanStrategy}
 import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource, 
InsertZorderBeforeWritingHive, ResolveZorder}
 
 // scalastyle:off line.size.limit
@@ -47,7 +47,6 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions 
=> Unit) {
 
     // watchdog extension
     extensions.injectCheckRule(_ => KyuubiUnsupportedOperationsCheck)
-    extensions.injectOptimizerRule(ForcedMaxOutputRowsRule)
     extensions.injectPlannerStrategy(MaxScanStrategy)
 
     extensions.injectQueryStagePrepRule(_ => InsertShuffleNodeBeforeJoin)
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsBase.scala
 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsBase.scala
deleted file mode 100644
index dd329bd659..0000000000
--- 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsBase.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.watchdog
-
-import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.expressions.Alias
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.command.DataWritingCommand
-
-import org.apache.kyuubi.sql.KyuubiSQLConf
-
-/*
- * Add ForcedMaxOutputRows rule for output rows limitation
- * to avoid huge output rows of non_limit query unexpectedly
- * mainly applied to cases as below:
- *
- * case 1:
- * {{{
- *   SELECT [c1, c2, ...]
- * }}}
- *
- * case 2:
- * {{{
- *   WITH CTE AS (
- *   ...)
- * SELECT [c1, c2, ...] FROM CTE ...
- * }}}
- *
- * The Logical Rule add a GlobalLimit node before root project
- * */
-trait ForcedMaxOutputRowsBase extends Rule[LogicalPlan] {
-
-  protected def isChildAggregate(a: Aggregate): Boolean
-
-  protected def canInsertLimitInner(p: LogicalPlan): Boolean = p match {
-    case Aggregate(_, Alias(_, "havingCondition") :: Nil, _) => false
-    case agg: Aggregate => !isChildAggregate(agg)
-    case _: RepartitionByExpression => true
-    case _: Distinct => true
-    case _: Filter => true
-    case _: Project => true
-    case Limit(_, _) => true
-    case _: Sort => true
-    case Union(children, _, _) =>
-      if (children.exists(_.isInstanceOf[DataWritingCommand])) {
-        false
-      } else {
-        true
-      }
-    case _: MultiInstanceRelation => true
-    case _: Join => true
-    case _ => false
-  }
-
-  protected def canInsertLimit(p: LogicalPlan, maxOutputRowsOpt: Option[Int]): 
Boolean = {
-    maxOutputRowsOpt match {
-      case Some(forcedMaxOutputRows) if forcedMaxOutputRows >= 0 =>
-        canInsertLimitInner(p) && !p.maxRows.exists(_ <= forcedMaxOutputRows)
-      case _ => false
-    }
-  }
-
-  override def apply(plan: LogicalPlan): LogicalPlan = {
-    val maxOutputRowsOpt = 
conf.getConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS)
-    plan match {
-      case p if p.resolved && canInsertLimit(p, maxOutputRowsOpt) =>
-        Limit(
-          maxOutputRowsOpt.get,
-          plan)
-      case _ => plan
-    }
-  }
-}
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsRule.scala
 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsRule.scala
deleted file mode 100644
index a3d990b109..0000000000
--- 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsRule.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.watchdog
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CommandResult, 
LogicalPlan, Union, WithCTE}
-import org.apache.spark.sql.execution.command.DataWritingCommand
-
-case class ForcedMaxOutputRowsRule(sparkSession: SparkSession) extends 
ForcedMaxOutputRowsBase {
-
-  override protected def isChildAggregate(a: Aggregate): Boolean = false
-
-  override protected def canInsertLimitInner(p: LogicalPlan): Boolean = p 
match {
-    case WithCTE(plan, _) => this.canInsertLimitInner(plan)
-    case plan: LogicalPlan => plan match {
-        case Union(children, _, _) => !children.exists {
-            case _: DataWritingCommand => true
-            case p: CommandResult if 
p.commandLogicalPlan.isInstanceOf[DataWritingCommand] => true
-            case _ => false
-          }
-        case _ => super.canInsertLimitInner(plan)
-      }
-  }
-
-  override protected def canInsertLimit(p: LogicalPlan, maxOutputRowsOpt: 
Option[Int]): Boolean = {
-    p match {
-      case WithCTE(plan, _) => this.canInsertLimit(plan, maxOutputRowsOpt)
-      case _ => super.canInsertLimit(p, maxOutputRowsOpt)
-    }
-  }
-}
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuite.scala
 
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuite.scala
index 87d18a2a44..0cf8a6a183 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuite.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuite.scala
@@ -22,7 +22,6 @@ import java.io.File
 import scala.collection.JavaConverters._
 
 import org.apache.commons.io.FileUtils
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
 
 import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException}
@@ -96,394 +95,6 @@ class WatchDogSuite extends KyuubiSparkSQLExtensionTest {
     }
   }
 
-  test("test watchdog: simple SELECT STATEMENT") {
-
-    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-
-      List("", "ORDER BY c1", "ORDER BY c2").foreach { sort =>
-        List("", " DISTINCT").foreach { distinct =>
-          assert(sql(
-            s"""
-               |SELECT $distinct *
-               |FROM t1
-               |$sort
-               
|""".stripMargin).queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-        }
-      }
-
-      limitAndExpecteds.foreach { case LimitAndExpected(limit, expected) =>
-        List("", "ORDER BY c1", "ORDER BY c2").foreach { sort =>
-          List("", "DISTINCT").foreach { distinct =>
-            assert(sql(
-              s"""
-                 |SELECT $distinct *
-                 |FROM t1
-                 |$sort
-                 |LIMIT $limit
-                 
|""".stripMargin).queryExecution.optimizedPlan.maxRows.contains(expected))
-          }
-        }
-      }
-    }
-  }
-
-  test("test watchdog: SELECT ... WITH AGGREGATE STATEMENT ") {
-
-    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-
-      assert(!sql("SELECT count(*) FROM t1")
-        .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-
-      val sorts = List("", "ORDER BY cnt", "ORDER BY c1", "ORDER BY cnt, c1", 
"ORDER BY c1, cnt")
-      val havingConditions = List("", "HAVING cnt > 1")
-
-      havingConditions.foreach { having =>
-        sorts.foreach { sort =>
-          assert(sql(
-            s"""
-               |SELECT c1, COUNT(*) as cnt
-               |FROM t1
-               |GROUP BY c1
-               |$having
-               |$sort
-               
|""".stripMargin).queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-        }
-      }
-
-      limitAndExpecteds.foreach { case LimitAndExpected(limit, expected) =>
-        havingConditions.foreach { having =>
-          sorts.foreach { sort =>
-            assert(sql(
-              s"""
-                 |SELECT c1, COUNT(*) as cnt
-                 |FROM t1
-                 |GROUP BY c1
-                 |$having
-                 |$sort
-                 |LIMIT $limit
-                 
|""".stripMargin).queryExecution.optimizedPlan.maxRows.contains(expected))
-          }
-        }
-      }
-    }
-  }
-
-  test("test watchdog: SELECT with CTE forceMaxOutputRows") {
-    // simple CTE
-    val q1 =
-      """
-        |WITH t2 AS (
-        |    SELECT * FROM t1
-        |)
-        |""".stripMargin
-
-    // nested CTE
-    val q2 =
-      """
-        |WITH
-        |    t AS (SELECT * FROM t1),
-        |    t2 AS (
-        |        WITH t3 AS (SELECT * FROM t1)
-        |        SELECT * FROM t3
-        |    )
-        |""".stripMargin
-    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-
-      val sorts = List("", "ORDER BY c1", "ORDER BY c2")
-
-      sorts.foreach { sort =>
-        Seq(q1, q2).foreach { withQuery =>
-          assert(sql(
-            s"""
-               |$withQuery
-               |SELECT * FROM t2
-               |$sort
-               
|""".stripMargin).queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-        }
-      }
-
-      limitAndExpecteds.foreach { case LimitAndExpected(limit, expected) =>
-        sorts.foreach { sort =>
-          Seq(q1, q2).foreach { withQuery =>
-            assert(sql(
-              s"""
-                 |$withQuery
-                 |SELECT * FROM t2
-                 |$sort
-                 |LIMIT $limit
-                 
|""".stripMargin).queryExecution.optimizedPlan.maxRows.contains(expected))
-          }
-        }
-      }
-    }
-  }
-
-  test("test watchdog: SELECT AGGREGATE WITH CTE forceMaxOutputRows") {
-
-    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-
-      assert(!sql(
-        """
-          |WITH custom_cte AS (
-          |SELECT * FROM t1
-          |)
-          |
-          |SELECT COUNT(*)
-          |FROM custom_cte
-          |""".stripMargin).queryExecution
-        .analyzed.isInstanceOf[GlobalLimit])
-
-      val sorts = List("", "ORDER BY cnt", "ORDER BY c1", "ORDER BY cnt, c1", 
"ORDER BY c1, cnt")
-      val havingConditions = List("", "HAVING cnt > 1")
-
-      havingConditions.foreach { having =>
-        sorts.foreach { sort =>
-          assert(sql(
-            s"""
-               |WITH custom_cte AS (
-               |SELECT * FROM t1
-               |)
-               |
-               |SELECT c1, COUNT(*) as cnt
-               |FROM custom_cte
-               |GROUP BY c1
-               |$having
-               |$sort
-               
|""".stripMargin).queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-        }
-      }
-
-      limitAndExpecteds.foreach { case LimitAndExpected(limit, expected) =>
-        havingConditions.foreach { having =>
-          sorts.foreach { sort =>
-            assert(sql(
-              s"""
-                 |WITH custom_cte AS (
-                 |SELECT * FROM t1
-                 |)
-                 |
-                 |SELECT c1, COUNT(*) as cnt
-                 |FROM custom_cte
-                 |GROUP BY c1
-                 |$having
-                 |$sort
-                 |LIMIT $limit
-                 
|""".stripMargin).queryExecution.optimizedPlan.maxRows.contains(expected))
-          }
-        }
-      }
-    }
-  }
-
-  test("test watchdog: UNION Statement for forceMaxOutputRows") {
-
-    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-
-      List("", "ALL").foreach { x =>
-        assert(sql(
-          s"""
-             |SELECT c1, c2 FROM t1
-             |UNION $x
-             |SELECT c1, c2 FROM t2
-             |UNION $x
-             |SELECT c1, c2 FROM t3
-             |""".stripMargin)
-          .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-      }
-
-      val sorts = List("", "ORDER BY cnt", "ORDER BY c1", "ORDER BY cnt, c1", 
"ORDER BY c1, cnt")
-      val havingConditions = List("", "HAVING cnt > 1")
-
-      List("", "ALL").foreach { x =>
-        havingConditions.foreach { having =>
-          sorts.foreach { sort =>
-            assert(sql(
-              s"""
-                 |SELECT c1, count(c2) as cnt
-                 |FROM t1
-                 |GROUP BY c1
-                 |$having
-                 |UNION $x
-                 |SELECT c1, COUNT(c2) as cnt
-                 |FROM t2
-                 |GROUP BY c1
-                 |$having
-                 |UNION $x
-                 |SELECT c1, COUNT(c2) as cnt
-                 |FROM t3
-                 |GROUP BY c1
-                 |$having
-                 |$sort
-                 |""".stripMargin)
-              .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-          }
-        }
-      }
-
-      limitAndExpecteds.foreach { case LimitAndExpected(limit, expected) =>
-        assert(sql(
-          s"""
-             |SELECT c1, c2 FROM t1
-             |UNION
-             |SELECT c1, c2 FROM t2
-             |UNION
-             |SELECT c1, c2 FROM t3
-             |LIMIT $limit
-             |""".stripMargin)
-          .queryExecution.optimizedPlan.maxRows.contains(expected))
-      }
-    }
-  }
-
-  test("test watchdog: Select View Statement for forceMaxOutputRows") {
-    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "3") {
-      withTable("tmp_table", "tmp_union") {
-        withView("tmp_view", "tmp_view2") {
-          sql(s"create table tmp_table (a int, b int)")
-          sql(s"insert into tmp_table values 
(1,10),(2,20),(3,30),(4,40),(5,50)")
-          sql(s"create table tmp_union (a int, b int)")
-          sql(s"insert into tmp_union values 
(6,60),(7,70),(8,80),(9,90),(10,100)")
-          sql(s"create view tmp_view2 as select * from tmp_union")
-          assert(!sql(
-            s"""
-               |CREATE VIEW tmp_view
-               |as
-               |SELECT * FROM
-               |tmp_table
-               |""".stripMargin)
-            .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-
-          assert(sql(
-            s"""
-               |SELECT * FROM
-               |tmp_view
-               |""".stripMargin)
-            .queryExecution.optimizedPlan.maxRows.contains(3))
-
-          assert(sql(
-            s"""
-               |SELECT * FROM
-               |tmp_view
-               |limit 11
-               |""".stripMargin)
-            .queryExecution.optimizedPlan.maxRows.contains(3))
-
-          assert(sql(
-            s"""
-               |SELECT * FROM
-               |(select * from tmp_view
-               |UNION
-               |select * from tmp_view2)
-               |ORDER BY a
-               |DESC
-               |""".stripMargin)
-            .collect().head.get(0) === 10)
-        }
-      }
-    }
-  }
-
-  test("test watchdog: Insert Statement for forceMaxOutputRows") {
-
-    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-      withTable("tmp_table", "tmp_insert") {
-        spark.sql(s"create table tmp_table (a int, b int)")
-        spark.sql(s"insert into tmp_table values 
(1,10),(2,20),(3,30),(4,40),(5,50)")
-        val multiInsertTableName1: String = "tmp_tbl1"
-        val multiInsertTableName2: String = "tmp_tbl2"
-        sql(s"drop table if exists $multiInsertTableName1")
-        sql(s"drop table if exists $multiInsertTableName2")
-        sql(s"create table $multiInsertTableName1 like tmp_table")
-        sql(s"create table $multiInsertTableName2 like tmp_table")
-        assert(!sql(
-          s"""
-             |FROM tmp_table
-             |insert into $multiInsertTableName1 select * limit 2
-             |insert into $multiInsertTableName2 select *
-             |""".stripMargin)
-          .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-      }
-    }
-  }
-
-  test("test watchdog: Distribute by for forceMaxOutputRows") {
-
-    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-      withTable("tmp_table") {
-        spark.sql(s"create table tmp_table (a int, b int)")
-        spark.sql(s"insert into tmp_table values 
(1,10),(2,20),(3,30),(4,40),(5,50)")
-        assert(sql(
-          s"""
-             |SELECT *
-             |FROM tmp_table
-             |DISTRIBUTE BY a
-             |""".stripMargin)
-          .queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-      }
-    }
-  }
-
-  test("test watchdog: Subquery for forceMaxOutputRows") {
-    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "1") {
-      withTable("tmp_table1") {
-        sql("CREATE TABLE spark_catalog.`default`.tmp_table1(KEY INT, VALUE 
STRING) USING PARQUET")
-        sql("INSERT INTO TABLE spark_catalog.`default`.tmp_table1 " +
-          "VALUES (1, 'aa'),(2,'bb'),(3, 'cc'),(4,'aa'),(5,'cc'),(6, 'aa')")
-        assert(
-          sql("select * from 
tmp_table1").queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
-        val testSqlText =
-          """
-            |select count(*)
-            |from tmp_table1
-            |where tmp_table1.key in (
-            |select distinct tmp_table1.key
-            |from tmp_table1
-            |where tmp_table1.value = "aa"
-            |)
-            |""".stripMargin
-        val plan = sql(testSqlText).queryExecution.optimizedPlan
-        assert(!findGlobalLimit(plan))
-        checkAnswer(sql(testSqlText), Row(3) :: Nil)
-      }
-
-      def findGlobalLimit(plan: LogicalPlan): Boolean = plan match {
-        case _: GlobalLimit => true
-        case p if p.children.isEmpty => false
-        case p => p.children.exists(findGlobalLimit)
-      }
-
-    }
-  }
-
-  test("test watchdog: Join for forceMaxOutputRows") {
-    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "1") {
-      withTable("tmp_table1", "tmp_table2") {
-        sql("CREATE TABLE spark_catalog.`default`.tmp_table1(KEY INT, VALUE 
STRING) USING PARQUET")
-        sql("INSERT INTO TABLE spark_catalog.`default`.tmp_table1 " +
-          "VALUES (1, 'aa'),(2,'bb'),(3, 'cc'),(4,'aa'),(5,'cc'),(6, 'aa')")
-        sql("CREATE TABLE spark_catalog.`default`.tmp_table2(KEY INT, VALUE 
STRING) USING PARQUET")
-        sql("INSERT INTO TABLE spark_catalog.`default`.tmp_table2 " +
-          "VALUES (1, 'aa'),(2,'bb'),(3, 'cc'),(4,'aa'),(5,'cc'),(6, 'aa')")
-        val testSqlText =
-          """
-            |select a.*,b.*
-            |from tmp_table1 a
-            |join
-            |tmp_table2 b
-            |on a.KEY = b.KEY
-            |""".stripMargin
-        val plan = sql(testSqlText).queryExecution.optimizedPlan
-        assert(findGlobalLimit(plan))
-      }
-
-      def findGlobalLimit(plan: LogicalPlan): Boolean = plan match {
-        case _: GlobalLimit => true
-        case p if p.children.isEmpty => false
-        case p => p.children.exists(findGlobalLimit)
-      }
-    }
-  }
-
   private def checkMaxFileSize(tableSize: Long, nonPartTableSize: Long): Unit 
= {
     withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> 
tableSize.toString) {
       checkAnswer(sql("SELECT count(distinct(p)) FROM test"), Row(10) :: Nil)

Reply via email to