This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b7d228  [KYUUBI #1123][TEST] Split KyuubiExtensionSuite according to 
different extensions
7b7d228 is described below

commit 7b7d228fb46f8bde48d4d0e8b0d7badb6614c063
Author: Simon <[email protected]>
AuthorDate: Wed Sep 22 14:51:58 2021 +0800

    [KYUUBI #1123][TEST] Split KyuubiExtensionSuite according to different 
extensions
    
    split KyuubiExtensionSuite according to different extensions
    #1123
    
    drop:
    `KyuubiSparkExtensionTest`
    
    add:
    - `FinalStageConfigIsolationSuite`
    - `InsertShuffleNodeBeforeJoinSuite`
    - `WatchDogSuite`
    - `RepartitionBeforeWriteSuite`
    - `SqlClassificationSuite`
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #1124 from simon824/extension-ut.
    
    Closes #1123
    
    4b1ae4db [Simon] Update FinalStageConfigIsolationSuite.scala
    2068a74d [Simon] Update SparkProcessBuilder.scala
    cd6aa7d2 [Simon] Merge branch 'apache:master' into extension-ut
    1401d69c [simon] rename test to suite
    98095823 [Simon] Rename WatchDogSuit.scala to WatchDogSuite.scala
    0392d5b5 [Simon] Rename SqlClassificationSuit.scala to 
SqlClassificationSuite.scala
    a8776df9 [Simon] Rename RepartitionBeforeWriteSuit.scala to 
RepartitionBeforeWriteSuite.scala
    4d06966c [Simon] Rename InsertShuffleNodeBeforeJoinSuit.scala to 
InsertShuffleNodeBeforeJoinSuite.scala
    60b2932e [Simon] Rename FinalStageConfigIsolationSuit.scala to 
FinalStageConfigIsolationSuite.scala
    175fb47f [Simon] Rename WatchDogTest.scala to WatchDogSuit.scala
    755ad22c [Simon] Rename SqlClassificationTest.scala to 
SqlClassificationSuit.scala
    b36171b8 [Simon] Rename RepartitionBeforeWriteTest.scala to 
RepartitionBeforeWriteSuit.scala
    74c110da [Simon] Rename InsertShuffleNodeBeforeJoinTest.scala to 
InsertShuffleNodeBeforeJoinSuit.scala
    405d8b72 [Simon] Rename FinalStageConfigIsolationTest.scala to 
FinalStageConfigIsolationSuit.scala
    7a158057 [simon] fix codestyle
    86148f3e [simon] fix ut
    291c93aa [simon] split KyuubiExtensionSuite to different extension ut class
    2098ce89 [simon] split KyuubiExtensionSuite to different extension ut class
    2aaed1c8 [simon] Merge remote-tracking branch 'upstream/master' into 
extension-ut
    
    Lead-authored-by: Simon <[email protected]>
    Co-authored-by: simon <[email protected]>
    Signed-off-by: ulysses-you <[email protected]>
---
 .../spark/sql/FinalStageConfigIsolationSuite.scala | 172 ++++++++
 .../sql/InsertShuffleNodeBeforeJoinSuite.scala     |  80 ++++
 .../spark/sql/RepartitionBeforeWriteSuite.scala    | 167 +++++++
 ...ionSuite.scala => SqlClassificationSuite.scala} | 483 +--------------------
 .../scala/org/apache/spark/sql/WatchDogSuite.scala | 153 +++++++
 5 files changed, 574 insertions(+), 481 deletions(-)

diff --git 
a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/FinalStageConfigIsolationSuite.scala
 
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/FinalStageConfigIsolationSuite.scala
new file mode 100644
index 0000000..992105a
--- /dev/null
+++ 
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/FinalStageConfigIsolationSuite.scala
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.execution.adaptive.{CustomShuffleReaderExec, 
QueryStageExec}
+import org.apache.spark.sql.internal.SQLConf
+
+import org.apache.kyuubi.sql.{FinalStageConfigIsolation, KyuubiSQLConf}
+
+class FinalStageConfigIsolationSuite extends KyuubiSparkSQLExtensionTest {
+  test("final stage config set reset check") {
+    withSQLConf(KyuubiSQLConf.FINAL_STAGE_CONFIG_ISOLATION.key -> "true",
+      "spark.sql.finalStage.adaptive.coalescePartitions.minPartitionNum" -> 
"1",
+      "spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes" -> "100") {
+      // use loop to double check final stage config doesn't affect the sql 
query each other
+      (1 to 3).foreach { _ =>
+        sql("SELECT COUNT(*) FROM VALUES(1) as t(c)").collect()
+        assert(spark.sessionState.conf.getConfString(
+          
"spark.sql.previousStage.adaptive.coalescePartitions.minPartitionNum") ===
+          FinalStageConfigIsolation.INTERNAL_UNSET_CONFIG_TAG)
+        assert(spark.sessionState.conf.getConfString(
+          "spark.sql.adaptive.coalescePartitions.minPartitionNum") ===
+          "1")
+        assert(spark.sessionState.conf.getConfString(
+          "spark.sql.finalStage.adaptive.coalescePartitions.minPartitionNum") 
===
+          "1")
+
+        // 64MB
+        assert(spark.sessionState.conf.getConfString(
+          "spark.sql.previousStage.adaptive.advisoryPartitionSizeInBytes") ===
+          "67108864b")
+        assert(spark.sessionState.conf.getConfString(
+          "spark.sql.adaptive.advisoryPartitionSizeInBytes") ===
+          "100")
+        assert(spark.sessionState.conf.getConfString(
+          "spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes") ===
+          "100")
+      }
+
+      sql("SET spark.sql.adaptive.advisoryPartitionSizeInBytes=1")
+      assert(spark.sessionState.conf.getConfString(
+        "spark.sql.adaptive.advisoryPartitionSizeInBytes") ===
+        "1")
+      assert(!spark.sessionState.conf.contains(
+        "spark.sql.previousStage.adaptive.advisoryPartitionSizeInBytes"))
+
+      sql("SET a=1")
+      assert(spark.sessionState.conf.getConfString("a") === "1")
+
+      sql("RESET spark.sql.adaptive.coalescePartitions.minPartitionNum")
+      assert(!spark.sessionState.conf.contains(
+        "spark.sql.adaptive.coalescePartitions.minPartitionNum"))
+      assert(!spark.sessionState.conf.contains(
+        "spark.sql.previousStage.adaptive.coalescePartitions.minPartitionNum"))
+
+      sql("RESET a")
+      assert(!spark.sessionState.conf.contains("a"))
+    }
+  }
+
+  test("final stage config isolation") {
+    def checkPartitionNum(sqlString: String, previousPartitionNum: Int,
+                          finalPartitionNum: Int): Unit = {
+      val df = sql(sqlString)
+      df.collect()
+      val shuffleReaders = collect(df.queryExecution.executedPlan) {
+        case customShuffleReader: CustomShuffleReaderExec => 
customShuffleReader
+      }
+      assert(shuffleReaders.nonEmpty)
+      // reorder stage by stage id to ensure we get the right stage
+      val sortedShuffleReaders = shuffleReaders.sortWith {
+        case (s1, s2) =>
+          s1.child.asInstanceOf[QueryStageExec].id < 
s2.child.asInstanceOf[QueryStageExec].id
+      }
+      if (sortedShuffleReaders.length > 1) {
+        assert(sortedShuffleReaders.head.partitionSpecs.length === 
previousPartitionNum)
+      }
+      assert(sortedShuffleReaders.last.partitionSpecs.length === 
finalPartitionNum)
+      assert(df.rdd.partitions.length === finalPartitionNum)
+    }
+
+    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
+      SQLConf.SHUFFLE_PARTITIONS.key -> "3",
+      KyuubiSQLConf.FINAL_STAGE_CONFIG_ISOLATION.key -> "true",
+      "spark.sql.adaptive.advisoryPartitionSizeInBytes" -> "1",
+      "spark.sql.adaptive.coalescePartitions.minPartitionSize" -> "1",
+      "spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes" -> 
"10000000") {
+
+      // use loop to double check final stage config doesn't affect the sql 
query each other
+      (1 to 3).foreach { _ =>
+        checkPartitionNum(
+          "SELECT c1, count(*) FROM t1 GROUP BY c1",
+          1,
+          1)
+
+        checkPartitionNum(
+          "SELECT c2, count(*) FROM (SELECT c1, count(*) as c2 FROM t1 GROUP 
BY c1) GROUP BY c2",
+          3,
+          1)
+
+        checkPartitionNum(
+          "SELECT t1.c1, count(*) FROM t1 JOIN t2 ON t1.c2 = t2.c2 GROUP BY 
t1.c1",
+          3,
+          1)
+
+        checkPartitionNum(
+          """
+            | SELECT /*+ REPARTITION */
+            | t1.c1, count(*) FROM t1
+            | JOIN t2 ON t1.c2 = t2.c2
+            | JOIN t3 ON t1.c1 = t3.c1
+            | GROUP BY t1.c1
+            |""".stripMargin,
+          3,
+          1)
+
+        // one shuffle reader
+        checkPartitionNum(
+          """
+            | SELECT /*+ BROADCAST(t1) */
+            | t1.c1, t2.c2 FROM t1
+            | JOIN t2 ON t1.c2 = t2.c2
+            | DISTRIBUTE BY c1
+            |""".stripMargin,
+          1,
+          1)
+
+        // test ReusedExchange
+        checkPartitionNum(
+          """
+            |SELECT /*+ REPARTITION */ t0.c2 FROM (
+            |SELECT t1.c1, (count(*) + c1) as c2 FROM t1 GROUP BY t1.c1
+            |) t0 JOIN (
+            |SELECT t1.c1, (count(*) + c1) as c2 FROM t1 GROUP BY t1.c1
+            |) t1 ON t0.c2 = t1.c2
+            |""".stripMargin,
+          3,
+          1
+        )
+
+        // one shuffle reader
+        checkPartitionNum(
+          """
+            |SELECT t0.c1 FROM (
+            |SELECT t1.c1 FROM t1 GROUP BY t1.c1
+            |) t0 JOIN (
+            |SELECT t1.c1 FROM t1 GROUP BY t1.c1
+            |) t1 ON t0.c1 = t1.c1
+            |""".stripMargin,
+          1,
+          1
+        )
+      }
+    }
+  }
+}
diff --git 
a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala
 
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala
new file mode 100644
index 0000000..939fea8
--- /dev/null
+++ 
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, 
ShuffleExchangeLike}
+import org.apache.spark.sql.internal.SQLConf
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+
+class InsertShuffleNodeBeforeJoinSuite extends KyuubiSparkSQLExtensionTest {
+  test("force shuffle before join") {
+    def checkShuffleNodeNum(sqlString: String, num: Int): Unit = {
+      var expectedResult: Seq[Row] = Seq.empty
+      withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+        expectedResult = sql(sqlString).collect()
+      }
+      val df = sql(sqlString)
+      checkAnswer(df, expectedResult)
+      assert(
+        collect(df.queryExecution.executedPlan) {
+          case shuffle: ShuffleExchangeLike
+            if shuffle.shuffleOrigin == ENSURE_REQUIREMENTS => shuffle
+        }.size == num)
+    }
+
+    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      KyuubiSQLConf.FORCE_SHUFFLE_BEFORE_JOIN.key -> "true") {
+      Seq("SHUFFLE_HASH", "MERGE").foreach { joinHint =>
+        // positive case
+        checkShuffleNodeNum(
+          s"""
+             |SELECT /*+ $joinHint(t2, t3) */ t1.c1, t1.c2, t2.c1, t3.c1 from 
t1
+             | JOIN t2 ON t1.c1 = t2.c1
+             | JOIN t3 ON t1.c1 = t3.c1
+             | """.stripMargin, 4)
+
+        // negative case
+        checkShuffleNodeNum(
+          s"""
+             |SELECT /*+ $joinHint(t2, t3) */ t1.c1, t1.c2, t2.c1, t3.c1 from 
t1
+             | JOIN t2 ON t1.c1 = t2.c1
+             | JOIN t3 ON t1.c2 = t3.c2
+             | """.stripMargin, 4)
+      }
+
+      checkShuffleNodeNum(
+        """
+          |SELECT t1.c1, t2.c1, t3.c2 from t1
+          | JOIN t2 ON t1.c1 = t2.c1
+          | JOIN (
+          |  SELECT c2, count(*) FROM t1 GROUP BY c2
+          | ) t3 ON t1.c1 = t3.c2
+          | """.stripMargin, 5)
+
+      checkShuffleNodeNum(
+        """
+          |SELECT t1.c1, t2.c1, t3.c1 from t1
+          | JOIN t2 ON t1.c1 = t2.c1
+          | JOIN (
+          |  SELECT c1, count(*) FROM t1 GROUP BY c1
+          | ) t3 ON t1.c1 = t3.c1
+          | """.stripMargin, 5)
+    }
+  }
+}
diff --git 
a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/RepartitionBeforeWriteSuite.scala
 
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/RepartitionBeforeWriteSuite.scala
new file mode 100644
index 0000000..239afff
--- /dev/null
+++ 
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/RepartitionBeforeWriteSuite.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Multiply}
+import org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
+import org.apache.spark.sql.hive.HiveUtils
+import 
org.apache.spark.sql.hive.execution.OptimizedCreateHiveTableAsSelectCommand
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+
+class RepartitionBeforeWriteSuite extends KyuubiSparkSQLExtensionTest {
+  test("check repartition exists") {
+    def check(df: DataFrame): Unit = {
+      assert(
+        df.queryExecution.analyzed.collect {
+          case r: RepartitionByExpression =>
+            assert(r.optNumPartitions ===
+              
spark.sessionState.conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM))
+            r
+        }.size == 1
+      )
+    }
+
+    // It's better to set config explicitly in case of we change the default 
value.
+    withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true") {
+      Seq("USING PARQUET", "").foreach { storage =>
+        withTable("tmp1") {
+          sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2 
string)")
+          check(sql("INSERT INTO TABLE tmp1 PARTITION(c2='a') " +
+            "SELECT * FROM VALUES(1),(2) AS t(c1)"))
+        }
+
+        withTable("tmp1") {
+          sql(s"CREATE TABLE tmp1 (c1 int) $storage")
+          check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS 
t(c1)"))
+          check(sql("INSERT INTO TABLE tmp1 " +
+            "SELECT * FROM VALUES(1),(2),(3) AS t(c1) DISTRIBUTE BY c1"))
+        }
+
+        withTable("tmp1") {
+          sql(s"CREATE TABLE tmp1 $storage AS SELECT * FROM VALUES(1),(2),(3) 
AS t(c1)")
+        }
+
+        withTable("tmp1") {
+          sql(s"CREATE TABLE tmp1 $storage PARTITIONED BY(c2) AS " +
+            s"SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)")
+        }
+
+        withTable("tmp1") {
+          sql(s"CREATE TABLE tmp1 $storage PARTITIONED BY(c2) AS " +
+            s"SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2) DISTRIBUTE BY 
rand()")
+        }
+      }
+    }
+  }
+
+  test("check repartition does not exists") {
+    def check(df: DataFrame): Unit = {
+      assert(
+        df.queryExecution.analyzed.collect {
+          case r: RepartitionByExpression => r
+        }.isEmpty
+      )
+    }
+
+    withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true") {
+      // test no write command
+      check(sql("SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)"))
+      check(sql("SELECT count(*) FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)"))
+
+      // test not supported plan
+      withTable("tmp1") {
+        sql(s"CREATE TABLE tmp1 (c1 int) PARTITIONED BY (c2 string)")
+        check(sql("INSERT INTO TABLE tmp1 PARTITION(c2) " +
+          "SELECT /*+ repartition(10) */ * FROM VALUES(1, 'a'),(2, 'b') AS 
t(c1, c2)"))
+        check(sql("INSERT INTO TABLE tmp1 PARTITION(c2) " +
+          "SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2) ORDER BY c1"))
+        check(sql("INSERT INTO TABLE tmp1 PARTITION(c2) " +
+          "SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2) LIMIT 10"))
+      }
+    }
+
+    withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "false") {
+      Seq("USING PARQUET", "").foreach { storage =>
+        withTable("tmp1") {
+          sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2 
string)")
+          check(sql("INSERT INTO TABLE tmp1 PARTITION(c2) " +
+            "SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)"))
+        }
+
+        withTable("tmp1") {
+          sql(s"CREATE TABLE tmp1 (c1 int) $storage")
+          check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS 
t(c1)"))
+        }
+
+        withTable("tmp1") {
+          sql(s"CREATE TABLE tmp1 $storage AS SELECT * FROM VALUES(1),(2),(3) 
AS t(c1)")
+        }
+
+        withTable("tmp1") {
+          sql(s"CREATE TABLE tmp1 $storage PARTITIONED BY(c2) AS " +
+            s"SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)")
+        }
+      }
+    }
+  }
+
+  test("test dynamic partition write") {
+    def checkRepartitionExpression(df: DataFrame): Unit = {
+      assert(df.queryExecution.analyzed.collect {
+        case r: RepartitionByExpression if r.partitionExpressions.size == 2 =>
+          assert(r.partitionExpressions.head.asInstanceOf[Attribute].name === 
"c2")
+          
assert(r.partitionExpressions(1).asInstanceOf[Cast].child.asInstanceOf[Multiply]
+            .left.asInstanceOf[Attribute].name.startsWith("_nondeterministic"))
+          r
+      }.size == 1)
+    }
+
+    withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true",
+      KyuubiSQLConf.DYNAMIC_PARTITION_INSERTION_REPARTITION_NUM.key -> "2") {
+      Seq("USING PARQUET", "").foreach { storage =>
+        withTable("tmp1") {
+          sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2 
string)")
+          checkRepartitionExpression(sql("INSERT INTO TABLE tmp1 SELECT 1 as 
c1, 'a' as c2 "))
+        }
+
+        withTable("tmp1") {
+          checkRepartitionExpression(
+            sql("CREATE TABLE tmp1 PARTITIONED BY(C2) SELECT 1 as c1, 'a' as 
c2 "))
+        }
+      }
+    }
+  }
+
+  test("OptimizedCreateHiveTableAsSelectCommand") {
+    withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "true",
+      HiveUtils.CONVERT_METASTORE_CTAS.key -> "true") {
+      withTable("t") {
+        val df = sql(s"CREATE TABLE t STORED AS parquet AS SELECT 1 as a")
+        val ctas = df.queryExecution.analyzed.collect {
+          case _: OptimizedCreateHiveTableAsSelectCommand => true
+        }
+        assert(ctas.size == 1)
+        val repartition = df.queryExecution.analyzed.collect {
+          case _: RepartitionByExpression => true
+        }
+        assert(repartition.size == 1)
+      }
+    }
+  }
+}
diff --git 
a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
 
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/SqlClassificationSuite.scala
similarity index 65%
rename from 
dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
rename to 
dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/SqlClassificationSuite.scala
index c94b33f..7ff0024 100644
--- 
a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
+++ 
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/SqlClassificationSuite.scala
@@ -19,361 +19,9 @@ package org.apache.spark.sql
 
 import scala.collection.mutable.Set
 
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Multiply}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, 
RepartitionByExpression}
-import org.apache.spark.sql.execution.adaptive.{CustomShuffleReaderExec, 
QueryStageExec}
-import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, 
ShuffleExchangeLike}
-import org.apache.spark.sql.hive.HiveUtils
-import 
org.apache.spark.sql.hive.execution.OptimizedCreateHiveTableAsSelectCommand
-import org.apache.spark.sql.internal.SQLConf
-
-import org.apache.kyuubi.sql.{FinalStageConfigIsolation, KyuubiSQLConf}
-import org.apache.kyuubi.sql.watchdog.MaxHivePartitionExceedException
-
-class KyuubiExtensionSuite extends KyuubiSparkSQLExtensionTest {
-
-  test("check repartition exists") {
-    def check(df: DataFrame): Unit = {
-      assert(
-        df.queryExecution.analyzed.collect {
-          case r: RepartitionByExpression =>
-            assert(r.optNumPartitions ===
-              
spark.sessionState.conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM))
-            r
-        }.size == 1
-      )
-    }
-
-    // It's better to set config explicitly in case of we change the default 
value.
-    withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true") {
-      Seq("USING PARQUET", "").foreach { storage =>
-        withTable("tmp1") {
-          sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2 
string)")
-          check(sql("INSERT INTO TABLE tmp1 PARTITION(c2='a') " +
-            "SELECT * FROM VALUES(1),(2) AS t(c1)"))
-        }
-
-        withTable("tmp1") {
-          sql(s"CREATE TABLE tmp1 (c1 int) $storage")
-          check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS 
t(c1)"))
-          check(sql("INSERT INTO TABLE tmp1 " +
-            "SELECT * FROM VALUES(1),(2),(3) AS t(c1) DISTRIBUTE BY c1"))
-        }
-
-        withTable("tmp1") {
-          sql(s"CREATE TABLE tmp1 $storage AS SELECT * FROM VALUES(1),(2),(3) 
AS t(c1)")
-        }
-
-        withTable("tmp1") {
-          sql(s"CREATE TABLE tmp1 $storage PARTITIONED BY(c2) AS " +
-            s"SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)")
-        }
-
-        withTable("tmp1") {
-          sql(s"CREATE TABLE tmp1 $storage PARTITIONED BY(c2) AS " +
-            s"SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2) DISTRIBUTE BY 
rand()")
-        }
-      }
-    }
-  }
-
-  test("check repartition does not exists") {
-    def check(df: DataFrame): Unit = {
-      assert(
-        df.queryExecution.analyzed.collect {
-          case r: RepartitionByExpression => r
-        }.isEmpty
-      )
-    }
-
-    withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true") {
-      // test no write command
-      check(sql("SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)"))
-      check(sql("SELECT count(*) FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)"))
-
-      // test not supported plan
-      withTable("tmp1") {
-        sql(s"CREATE TABLE tmp1 (c1 int) PARTITIONED BY (c2 string)")
-        check(sql("INSERT INTO TABLE tmp1 PARTITION(c2) " +
-          "SELECT /*+ repartition(10) */ * FROM VALUES(1, 'a'),(2, 'b') AS 
t(c1, c2)"))
-        check(sql("INSERT INTO TABLE tmp1 PARTITION(c2) " +
-          "SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2) ORDER BY c1"))
-        check(sql("INSERT INTO TABLE tmp1 PARTITION(c2) " +
-          "SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2) LIMIT 10"))
-      }
-    }
-
-    withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "false") {
-      Seq("USING PARQUET", "").foreach { storage =>
-        withTable("tmp1") {
-          sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2 
string)")
-          check(sql("INSERT INTO TABLE tmp1 PARTITION(c2) " +
-            "SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)"))
-        }
-
-        withTable("tmp1") {
-          sql(s"CREATE TABLE tmp1 (c1 int) $storage")
-          check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS 
t(c1)"))
-        }
-
-        withTable("tmp1") {
-          sql(s"CREATE TABLE tmp1 $storage AS SELECT * FROM VALUES(1),(2),(3) 
AS t(c1)")
-        }
-
-        withTable("tmp1") {
-          sql(s"CREATE TABLE tmp1 $storage PARTITIONED BY(c2) AS " +
-            s"SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)")
-        }
-      }
-    }
-  }
-
-  test("test dynamic partition write") {
-    def checkRepartitionExpression(df: DataFrame): Unit = {
-      assert(df.queryExecution.analyzed.collect {
-        case r: RepartitionByExpression if r.partitionExpressions.size == 2 =>
-          assert(r.partitionExpressions.head.asInstanceOf[Attribute].name === 
"c2")
-          
assert(r.partitionExpressions(1).asInstanceOf[Cast].child.asInstanceOf[Multiply]
-            .left.asInstanceOf[Attribute].name.startsWith("_nondeterministic"))
-          r
-      }.size == 1)
-    }
-
-    withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true",
-      KyuubiSQLConf.DYNAMIC_PARTITION_INSERTION_REPARTITION_NUM.key -> "2") {
-      Seq("USING PARQUET", "").foreach { storage =>
-        withTable("tmp1") {
-          sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2 
string)")
-          checkRepartitionExpression(sql("INSERT INTO TABLE tmp1 SELECT 1 as 
c1, 'a' as c2 "))
-        }
-
-        withTable("tmp1") {
-          checkRepartitionExpression(
-            sql("CREATE TABLE tmp1 PARTITIONED BY(C2) SELECT 1 as c1, 'a' as 
c2 "))
-        }
-      }
-    }
-  }
-
-  test("force shuffle before join") {
-    def checkShuffleNodeNum(sqlString: String, num: Int): Unit = {
-      var expectedResult: Seq[Row] = Seq.empty
-      withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
-        expectedResult = sql(sqlString).collect()
-      }
-      val df = sql(sqlString)
-      checkAnswer(df, expectedResult)
-      assert(
-        collect(df.queryExecution.executedPlan) {
-          case shuffle: ShuffleExchangeLike
-            if shuffle.shuffleOrigin == ENSURE_REQUIREMENTS => shuffle
-        }.size == num)
-    }
-
-    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
-      KyuubiSQLConf.FORCE_SHUFFLE_BEFORE_JOIN.key -> "true") {
-      Seq("SHUFFLE_HASH", "MERGE").foreach { joinHint =>
-        // positive case
-        checkShuffleNodeNum(
-          s"""
-            |SELECT /*+ $joinHint(t2, t3) */ t1.c1, t1.c2, t2.c1, t3.c1 from t1
-            | JOIN t2 ON t1.c1 = t2.c1
-            | JOIN t3 ON t1.c1 = t3.c1
-            | """.stripMargin, 4)
-
-        // negative case
-        checkShuffleNodeNum(
-          s"""
-             |SELECT /*+ $joinHint(t2, t3) */ t1.c1, t1.c2, t2.c1, t3.c1 from 
t1
-             | JOIN t2 ON t1.c1 = t2.c1
-             | JOIN t3 ON t1.c2 = t3.c2
-             | """.stripMargin, 4)
-      }
-
-      checkShuffleNodeNum(
-        """
-          |SELECT t1.c1, t2.c1, t3.c2 from t1
-          | JOIN t2 ON t1.c1 = t2.c1
-          | JOIN (
-          |  SELECT c2, count(*) FROM t1 GROUP BY c2
-          | ) t3 ON t1.c1 = t3.c2
-          | """.stripMargin, 5)
-
-      checkShuffleNodeNum(
-        """
-          |SELECT t1.c1, t2.c1, t3.c1 from t1
-          | JOIN t2 ON t1.c1 = t2.c1
-          | JOIN (
-          |  SELECT c1, count(*) FROM t1 GROUP BY c1
-          | ) t3 ON t1.c1 = t3.c1
-          | """.stripMargin, 5)
-    }
-  }
-
-  test("final stage config set reset check") {
-    withSQLConf(KyuubiSQLConf.FINAL_STAGE_CONFIG_ISOLATION.key -> "true",
-      "spark.sql.finalStage.adaptive.coalescePartitions.minPartitionNum" -> 
"1",
-      "spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes" -> "100") {
-      // use loop to double check final stage config doesn't affect the sql 
query each other
-      (1 to 3).foreach { _ =>
-        sql("SELECT COUNT(*) FROM VALUES(1) as t(c)").collect()
-        assert(spark.sessionState.conf.getConfString(
-          
"spark.sql.previousStage.adaptive.coalescePartitions.minPartitionNum") ===
-          FinalStageConfigIsolation.INTERNAL_UNSET_CONFIG_TAG)
-        assert(spark.sessionState.conf.getConfString(
-          "spark.sql.adaptive.coalescePartitions.minPartitionNum") ===
-          "1")
-        assert(spark.sessionState.conf.getConfString(
-          "spark.sql.finalStage.adaptive.coalescePartitions.minPartitionNum") 
===
-          "1")
-
-        // 64MB
-        assert(spark.sessionState.conf.getConfString(
-          "spark.sql.previousStage.adaptive.advisoryPartitionSizeInBytes") ===
-          "67108864b")
-        assert(spark.sessionState.conf.getConfString(
-          "spark.sql.adaptive.advisoryPartitionSizeInBytes") ===
-          "100")
-        assert(spark.sessionState.conf.getConfString(
-          "spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes") ===
-          "100")
-      }
-
-      sql("SET spark.sql.adaptive.advisoryPartitionSizeInBytes=1")
-      assert(spark.sessionState.conf.getConfString(
-        "spark.sql.adaptive.advisoryPartitionSizeInBytes") ===
-        "1")
-      assert(!spark.sessionState.conf.contains(
-        "spark.sql.previousStage.adaptive.advisoryPartitionSizeInBytes"))
-
-      sql("SET a=1")
-      assert(spark.sessionState.conf.getConfString("a") === "1")
-
-      sql("RESET spark.sql.adaptive.coalescePartitions.minPartitionNum")
-      assert(!spark.sessionState.conf.contains(
-        "spark.sql.adaptive.coalescePartitions.minPartitionNum"))
-      assert(!spark.sessionState.conf.contains(
-        "spark.sql.previousStage.adaptive.coalescePartitions.minPartitionNum"))
-
-      sql("RESET a")
-      assert(!spark.sessionState.conf.contains("a"))
-    }
-  }
-
-  test("final stage config isolation") {
-    def checkPartitionNum(
-        sqlString: String, previousPartitionNum: Int, finalPartitionNum: Int): 
Unit = {
-      val df = sql(sqlString)
-      df.collect()
-      val shuffleReaders = collect(df.queryExecution.executedPlan) {
-        case customShuffleReader: CustomShuffleReaderExec => 
customShuffleReader
-      }
-      assert(shuffleReaders.nonEmpty)
-      // reorder stage by stage id to ensure we get the right stage
-      val sortedShuffleReaders = shuffleReaders.sortWith {
-        case (s1, s2) =>
-          s1.child.asInstanceOf[QueryStageExec].id < 
s2.child.asInstanceOf[QueryStageExec].id
-      }
-      if (sortedShuffleReaders.length > 1) {
-        assert(sortedShuffleReaders.head.partitionSpecs.length === 
previousPartitionNum)
-      }
-      assert(sortedShuffleReaders.last.partitionSpecs.length === 
finalPartitionNum)
-      assert(df.rdd.partitions.length === finalPartitionNum)
-    }
-
-    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
-      SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
-      SQLConf.SHUFFLE_PARTITIONS.key -> "3",
-      KyuubiSQLConf.FINAL_STAGE_CONFIG_ISOLATION.key -> "true",
-      "spark.sql.adaptive.advisoryPartitionSizeInBytes" -> "1",
-      "spark.sql.adaptive.coalescePartitions.minPartitionSize" -> "1",
-      "spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes" -> 
"10000000") {
-
-      // use loop to double check final stage config doesn't affect the sql 
query each other
-      (1 to 3).foreach { _ =>
-        checkPartitionNum(
-          "SELECT c1, count(*) FROM t1 GROUP BY c1",
-          1,
-          1)
-
-        checkPartitionNum(
-          "SELECT c2, count(*) FROM (SELECT c1, count(*) as c2 FROM t1 GROUP 
BY c1) GROUP BY c2",
-          3,
-          1)
-
-        checkPartitionNum(
-          "SELECT t1.c1, count(*) FROM t1 JOIN t2 ON t1.c2 = t2.c2 GROUP BY 
t1.c1",
-          3,
-          1)
-
-        checkPartitionNum(
-          """
-            | SELECT /*+ REPARTITION */
-            | t1.c1, count(*) FROM t1
-            | JOIN t2 ON t1.c2 = t2.c2
-            | JOIN t3 ON t1.c1 = t3.c1
-            | GROUP BY t1.c1
-            |""".stripMargin,
-          3,
-          1)
-
-        // one shuffle reader
-        checkPartitionNum(
-          """
-            | SELECT /*+ BROADCAST(t1) */
-            | t1.c1, t2.c2 FROM t1
-            | JOIN t2 ON t1.c2 = t2.c2
-            | DISTRIBUTE BY c1
-            |""".stripMargin,
-          1,
-          1)
-
-        // test ReusedExchange
-        checkPartitionNum(
-          """
-            |SELECT /*+ REPARTITION */ t0.c2 FROM (
-            |SELECT t1.c1, (count(*) + c1) as c2 FROM t1 GROUP BY t1.c1
-            |) t0 JOIN (
-            |SELECT t1.c1, (count(*) + c1) as c2 FROM t1 GROUP BY t1.c1
-            |) t1 ON t0.c2 = t1.c2
-            |""".stripMargin,
-          3,
-          1
-        )
-
-        // one shuffle reader
-        checkPartitionNum(
-          """
-            |SELECT t0.c1 FROM (
-            |SELECT t1.c1 FROM t1 GROUP BY t1.c1
-            |) t0 JOIN (
-            |SELECT t1.c1 FROM t1 GROUP BY t1.c1
-            |) t1 ON t0.c1 = t1.c1
-            |""".stripMargin,
-          1,
-          1
-        )
-      }
-    }
-  }
-
-  test("OptimizedCreateHiveTableAsSelectCommand") {
-    withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "true",
-      HiveUtils.CONVERT_METASTORE_CTAS.key -> "true") {
-      withTable("t") {
-        val df = sql(s"CREATE TABLE t STORED AS parquet AS SELECT 1 as a")
-        val ctas = df.queryExecution.analyzed.collect {
-          case _: OptimizedCreateHiveTableAsSelectCommand => true
-        }
-        assert(ctas.size == 1)
-        val repartition = df.queryExecution.analyzed.collect {
-          case _: RepartitionByExpression => true
-        }
-        assert(repartition.size == 1)
-      }
-    }
-  }
+import org.apache.kyuubi.sql.KyuubiSQLConf
 
+class SqlClassificationSuite extends KyuubiSparkSQLExtensionTest {
   test("Sql classification for ddl") {
     withSQLConf(KyuubiSQLConf.SQL_CLASSIFICATION_ENABLED.key -> "true") {
       withDatabase("inventory") {
@@ -1268,131 +916,4 @@ class KyuubiExtensionSuite extends 
KyuubiSparkSQLExtensionTest {
     println("auxiliary statement simple name is :" + 
auxiStatementSimpleName.toSeq.sorted)
     // scalastyle:on println
   }
-  test("test watchdog with scan maxHivePartitions") {
-    withTable("test", "temp") {
-      sql(
-        s"""
-           |CREATE TABLE test(i int)
-           |PARTITIONED BY (p int)
-           |STORED AS textfile""".stripMargin)
-      spark.range(0, 10, 1).selectExpr("id as col")
-        .createOrReplaceTempView("temp")
-
-      for (part <- Range(0, 10)) {
-        sql(
-          s"""
-             |INSERT OVERWRITE TABLE test PARTITION (p='$part')
-             |select col from temp""".stripMargin)
-      }
-
-      withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_HIVEPARTITION.key -> "5") {
-
-        sql("SELECT * FROM test where p=1").queryExecution.sparkPlan
-
-        sql(
-          s"SELECT * FROM test WHERE p in (${Range(0, 
5).toList.mkString(",")})")
-          .queryExecution.sparkPlan
-
-        intercept[MaxHivePartitionExceedException](
-          sql("SELECT * FROM test").queryExecution.sparkPlan)
-
-        intercept[MaxHivePartitionExceedException](sql(
-          s"SELECT * FROM test WHERE p in (${Range(0, 
6).toList.mkString(",")})")
-          .queryExecution.sparkPlan)
-
-      }
-    }
-  }
-
-  test("test watchdog with query forceMaxOutputRows") {
-
-    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-
-      assert(sql("SELECT * FROM t1")
-        .queryExecution.analyzed.isInstanceOf[GlobalLimit])
-
-      assert(sql("SELECT * FROM t1 LIMIT 1")
-        .queryExecution.analyzed.asInstanceOf[GlobalLimit].maxRows.contains(1))
-
-      assert(sql("SELECT * FROM t1 LIMIT 11")
-        
.queryExecution.analyzed.asInstanceOf[GlobalLimit].maxRows.contains(10))
-
-      assert(!sql("SELECT count(*) FROM t1")
-        .queryExecution.analyzed.isInstanceOf[GlobalLimit])
-
-      assert(sql(
-        """
-          |SELECT c1, COUNT(*)
-          |FROM t1
-          |GROUP BY c1
-          |""".stripMargin).queryExecution.analyzed.isInstanceOf[GlobalLimit])
-
-      assert(sql(
-        """
-          |WITH custom_cte AS (
-          |SELECT * FROM t1
-          |)
-          |
-          |SELECT * FROM custom_cte
-          |""".stripMargin).queryExecution
-        .analyzed.isInstanceOf[GlobalLimit])
-
-      assert(sql(
-        """
-          |WITH custom_cte AS (
-          |SELECT * FROM t1
-          |)
-          |
-          |SELECT * FROM custom_cte
-          |LIMIT 1
-          |""".stripMargin).queryExecution
-        .analyzed.asInstanceOf[GlobalLimit].maxRows.contains(1))
-
-      assert(sql(
-        """
-          |WITH custom_cte AS (
-          |SELECT * FROM t1
-          |)
-          |
-          |SELECT * FROM custom_cte
-          |LIMIT 11
-          |""".stripMargin).queryExecution
-        .analyzed.asInstanceOf[GlobalLimit].maxRows.contains(10))
-
-      assert(!sql(
-        """
-          |WITH custom_cte AS (
-          |SELECT * FROM t1
-          |)
-          |
-          |SELECT COUNT(*) FROM custom_cte
-          |""".stripMargin).queryExecution
-        .analyzed.isInstanceOf[GlobalLimit])
-
-      assert(sql(
-        """
-          |WITH custom_cte AS (
-          |SELECT * FROM t1
-          |)
-          |
-          |SELECT c1, COUNT(*)
-          |FROM custom_cte
-          |GROUP BY c1
-          |""".stripMargin).queryExecution
-        .analyzed.isInstanceOf[GlobalLimit])
-
-      assert(sql(
-        """
-          |WITH custom_cte AS (
-          |SELECT * FROM t1
-          |)
-          |
-          |SELECT c1, COUNT(*)
-          |FROM custom_cte
-          |GROUP BY c1
-          |LIMIT 11
-          |""".stripMargin).queryExecution
-        .analyzed.asInstanceOf[GlobalLimit].maxRows.contains(10))
-    }
-  }
 }
diff --git 
a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/WatchDogSuite.scala
 
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/WatchDogSuite.scala
new file mode 100644
index 0000000..483a2bf
--- /dev/null
+++ 
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/WatchDogSuite.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.plans.logical.GlobalLimit
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+import org.apache.kyuubi.sql.watchdog.MaxHivePartitionExceedException
+
+class WatchDogSuite extends KyuubiSparkSQLExtensionTest {
+  test("test watchdog with scan maxHivePartitions") {
+    withTable("test", "temp") {
+      sql(
+        s"""
+           |CREATE TABLE test(i int)
+           |PARTITIONED BY (p int)
+           |STORED AS textfile""".stripMargin)
+      spark.range(0, 10, 1).selectExpr("id as col")
+        .createOrReplaceTempView("temp")
+
+      for (part <- Range(0, 10)) {
+        sql(
+          s"""
+             |INSERT OVERWRITE TABLE test PARTITION (p='$part')
+             |select col from temp""".stripMargin)
+      }
+
+      withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_HIVEPARTITION.key -> "5") {
+
+        sql("SELECT * FROM test where p=1").queryExecution.sparkPlan
+
+        sql(
+          s"SELECT * FROM test WHERE p in (${Range(0, 
5).toList.mkString(",")})")
+          .queryExecution.sparkPlan
+
+        intercept[MaxHivePartitionExceedException](
+          sql("SELECT * FROM test").queryExecution.sparkPlan)
+
+        intercept[MaxHivePartitionExceedException](sql(
+          s"SELECT * FROM test WHERE p in (${Range(0, 
6).toList.mkString(",")})")
+          .queryExecution.sparkPlan)
+
+      }
+    }
+  }
+
+  test("test watchdog with query forceMaxOutputRows") {
+
+    withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
+
+      assert(sql("SELECT * FROM t1")
+        .queryExecution.analyzed.isInstanceOf[GlobalLimit])
+
+      assert(sql("SELECT * FROM t1 LIMIT 1")
+        .queryExecution.analyzed.asInstanceOf[GlobalLimit].maxRows.contains(1))
+
+      assert(sql("SELECT * FROM t1 LIMIT 11")
+        
.queryExecution.analyzed.asInstanceOf[GlobalLimit].maxRows.contains(10))
+
+      assert(!sql("SELECT count(*) FROM t1")
+        .queryExecution.analyzed.isInstanceOf[GlobalLimit])
+
+      assert(sql(
+        """
+          |SELECT c1, COUNT(*)
+          |FROM t1
+          |GROUP BY c1
+          |""".stripMargin).queryExecution.analyzed.isInstanceOf[GlobalLimit])
+
+      assert(sql(
+        """
+          |WITH custom_cte AS (
+          |SELECT * FROM t1
+          |)
+          |
+          |SELECT * FROM custom_cte
+          |""".stripMargin).queryExecution
+        .analyzed.isInstanceOf[GlobalLimit])
+
+      assert(sql(
+        """
+          |WITH custom_cte AS (
+          |SELECT * FROM t1
+          |)
+          |
+          |SELECT * FROM custom_cte
+          |LIMIT 1
+          |""".stripMargin).queryExecution
+        .analyzed.asInstanceOf[GlobalLimit].maxRows.contains(1))
+
+      assert(sql(
+        """
+          |WITH custom_cte AS (
+          |SELECT * FROM t1
+          |)
+          |
+          |SELECT * FROM custom_cte
+          |LIMIT 11
+          |""".stripMargin).queryExecution
+        .analyzed.asInstanceOf[GlobalLimit].maxRows.contains(10))
+
+      assert(!sql(
+        """
+          |WITH custom_cte AS (
+          |SELECT * FROM t1
+          |)
+          |
+          |SELECT COUNT(*) FROM custom_cte
+          |""".stripMargin).queryExecution
+        .analyzed.isInstanceOf[GlobalLimit])
+
+      assert(sql(
+        """
+          |WITH custom_cte AS (
+          |SELECT * FROM t1
+          |)
+          |
+          |SELECT c1, COUNT(*)
+          |FROM custom_cte
+          |GROUP BY c1
+          |""".stripMargin).queryExecution
+        .analyzed.isInstanceOf[GlobalLimit])
+
+      assert(sql(
+        """
+          |WITH custom_cte AS (
+          |SELECT * FROM t1
+          |)
+          |
+          |SELECT c1, COUNT(*)
+          |FROM custom_cte
+          |GROUP BY c1
+          |LIMIT 11
+          |""".stripMargin).queryExecution
+        .analyzed.asInstanceOf[GlobalLimit].maxRows.contains(10))
+    }
+  }
+}

Reply via email to