This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 7b7d228 [KYUUBI #1123][TEST] Split KyuubiExtensionSuite according to
different extensions
7b7d228 is described below
commit 7b7d228fb46f8bde48d4d0e8b0d7badb6614c063
Author: Simon <[email protected]>
AuthorDate: Wed Sep 22 14:51:58 2021 +0800
[KYUUBI #1123][TEST] Split KyuubiExtensionSuite according to different
extensions
split KyuubiExtensionSuite according to different extensions
#1123
drop:
`KyuubiSparkExtensionTest`
add:
- `FinalStageConfigIsolationSuite`
- `InsertShuffleNodeBeforeJoinSuite`
- `WatchDogSuite`
- `RepartitionBeforeWriteSuite`
- `SqlClassificationSuite`
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1124 from simon824/extension-ut.
Closes #1123
4b1ae4db [Simon] Update FinalStageConfigIsolationSuite.scala
2068a74d [Simon] Update SparkProcessBuilder.scala
cd6aa7d2 [Simon] Merge branch 'apache:master' into extension-ut
1401d69c [simon] rename test to suite
98095823 [Simon] Rename WatchDogSuit.scala to WatchDogSuite.scala
0392d5b5 [Simon] Rename SqlClassificationSuit.scala to
SqlClassificationSuite.scala
a8776df9 [Simon] Rename RepartitionBeforeWriteSuit.scala to
RepartitionBeforeWriteSuite.scala
4d06966c [Simon] Rename InsertShuffleNodeBeforeJoinSuit.scala to
InsertShuffleNodeBeforeJoinSuite.scala
60b2932e [Simon] Rename FinalStageConfigIsolationSuit.scala to
FinalStageConfigIsolationSuite.scala
175fb47f [Simon] Rename WatchDogTest.scala to WatchDogSuit.scala
755ad22c [Simon] Rename SqlClassificationTest.scala to
SqlClassificationSuit.scala
b36171b8 [Simon] Rename RepartitionBeforeWriteTest.scala to
RepartitionBeforeWriteSuit.scala
74c110da [Simon] Rename InsertShuffleNodeBeforeJoinTest.scala to
InsertShuffleNodeBeforeJoinSuit.scala
405d8b72 [Simon] Rename FinalStageConfigIsolationTest.scala to
FinalStageConfigIsolationSuit.scala
7a158057 [simon] fix codestyle
86148f3e [simon] fix ut
291c93aa [simon] split KyuubiExtensionSuite to different extension ut class
2098ce89 [simon] split KyuubiExtensionSuite to different extension ut class
2aaed1c8 [simon] Merge remote-tracking branch 'upstream/master' into
extension-ut
Lead-authored-by: Simon <[email protected]>
Co-authored-by: simon <[email protected]>
Signed-off-by: ulysses-you <[email protected]>
---
.../spark/sql/FinalStageConfigIsolationSuite.scala | 172 ++++++++
.../sql/InsertShuffleNodeBeforeJoinSuite.scala | 80 ++++
.../spark/sql/RepartitionBeforeWriteSuite.scala | 167 +++++++
...ionSuite.scala => SqlClassificationSuite.scala} | 483 +--------------------
.../scala/org/apache/spark/sql/WatchDogSuite.scala | 153 +++++++
5 files changed, 574 insertions(+), 481 deletions(-)
diff --git
a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/FinalStageConfigIsolationSuite.scala
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/FinalStageConfigIsolationSuite.scala
new file mode 100644
index 0000000..992105a
--- /dev/null
+++
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/FinalStageConfigIsolationSuite.scala
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.execution.adaptive.{CustomShuffleReaderExec,
QueryStageExec}
+import org.apache.spark.sql.internal.SQLConf
+
+import org.apache.kyuubi.sql.{FinalStageConfigIsolation, KyuubiSQLConf}
+
+class FinalStageConfigIsolationSuite extends KyuubiSparkSQLExtensionTest {
+ test("final stage config set reset check") {
+ withSQLConf(KyuubiSQLConf.FINAL_STAGE_CONFIG_ISOLATION.key -> "true",
+ "spark.sql.finalStage.adaptive.coalescePartitions.minPartitionNum" ->
"1",
+ "spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes" -> "100") {
+ // use loop to double check final stage config doesn't affect the sql
query each other
+ (1 to 3).foreach { _ =>
+ sql("SELECT COUNT(*) FROM VALUES(1) as t(c)").collect()
+ assert(spark.sessionState.conf.getConfString(
+
"spark.sql.previousStage.adaptive.coalescePartitions.minPartitionNum") ===
+ FinalStageConfigIsolation.INTERNAL_UNSET_CONFIG_TAG)
+ assert(spark.sessionState.conf.getConfString(
+ "spark.sql.adaptive.coalescePartitions.minPartitionNum") ===
+ "1")
+ assert(spark.sessionState.conf.getConfString(
+ "spark.sql.finalStage.adaptive.coalescePartitions.minPartitionNum")
===
+ "1")
+
+ // 64MB
+ assert(spark.sessionState.conf.getConfString(
+ "spark.sql.previousStage.adaptive.advisoryPartitionSizeInBytes") ===
+ "67108864b")
+ assert(spark.sessionState.conf.getConfString(
+ "spark.sql.adaptive.advisoryPartitionSizeInBytes") ===
+ "100")
+ assert(spark.sessionState.conf.getConfString(
+ "spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes") ===
+ "100")
+ }
+
+ sql("SET spark.sql.adaptive.advisoryPartitionSizeInBytes=1")
+ assert(spark.sessionState.conf.getConfString(
+ "spark.sql.adaptive.advisoryPartitionSizeInBytes") ===
+ "1")
+ assert(!spark.sessionState.conf.contains(
+ "spark.sql.previousStage.adaptive.advisoryPartitionSizeInBytes"))
+
+ sql("SET a=1")
+ assert(spark.sessionState.conf.getConfString("a") === "1")
+
+ sql("RESET spark.sql.adaptive.coalescePartitions.minPartitionNum")
+ assert(!spark.sessionState.conf.contains(
+ "spark.sql.adaptive.coalescePartitions.minPartitionNum"))
+ assert(!spark.sessionState.conf.contains(
+ "spark.sql.previousStage.adaptive.coalescePartitions.minPartitionNum"))
+
+ sql("RESET a")
+ assert(!spark.sessionState.conf.contains("a"))
+ }
+ }
+
+ test("final stage config isolation") {
+ def checkPartitionNum(sqlString: String, previousPartitionNum: Int,
+ finalPartitionNum: Int): Unit = {
+ val df = sql(sqlString)
+ df.collect()
+ val shuffleReaders = collect(df.queryExecution.executedPlan) {
+ case customShuffleReader: CustomShuffleReaderExec =>
customShuffleReader
+ }
+ assert(shuffleReaders.nonEmpty)
+ // reorder stage by stage id to ensure we get the right stage
+ val sortedShuffleReaders = shuffleReaders.sortWith {
+ case (s1, s2) =>
+ s1.child.asInstanceOf[QueryStageExec].id <
s2.child.asInstanceOf[QueryStageExec].id
+ }
+ if (sortedShuffleReaders.length > 1) {
+ assert(sortedShuffleReaders.head.partitionSpecs.length ===
previousPartitionNum)
+ }
+ assert(sortedShuffleReaders.last.partitionSpecs.length ===
finalPartitionNum)
+ assert(df.rdd.partitions.length === finalPartitionNum)
+ }
+
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+ SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
+ SQLConf.SHUFFLE_PARTITIONS.key -> "3",
+ KyuubiSQLConf.FINAL_STAGE_CONFIG_ISOLATION.key -> "true",
+ "spark.sql.adaptive.advisoryPartitionSizeInBytes" -> "1",
+ "spark.sql.adaptive.coalescePartitions.minPartitionSize" -> "1",
+ "spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes" ->
"10000000") {
+
+ // use loop to double check final stage config doesn't affect the sql
query each other
+ (1 to 3).foreach { _ =>
+ checkPartitionNum(
+ "SELECT c1, count(*) FROM t1 GROUP BY c1",
+ 1,
+ 1)
+
+ checkPartitionNum(
+ "SELECT c2, count(*) FROM (SELECT c1, count(*) as c2 FROM t1 GROUP
BY c1) GROUP BY c2",
+ 3,
+ 1)
+
+ checkPartitionNum(
+ "SELECT t1.c1, count(*) FROM t1 JOIN t2 ON t1.c2 = t2.c2 GROUP BY
t1.c1",
+ 3,
+ 1)
+
+ checkPartitionNum(
+ """
+ | SELECT /*+ REPARTITION */
+ | t1.c1, count(*) FROM t1
+ | JOIN t2 ON t1.c2 = t2.c2
+ | JOIN t3 ON t1.c1 = t3.c1
+ | GROUP BY t1.c1
+ |""".stripMargin,
+ 3,
+ 1)
+
+ // one shuffle reader
+ checkPartitionNum(
+ """
+ | SELECT /*+ BROADCAST(t1) */
+ | t1.c1, t2.c2 FROM t1
+ | JOIN t2 ON t1.c2 = t2.c2
+ | DISTRIBUTE BY c1
+ |""".stripMargin,
+ 1,
+ 1)
+
+ // test ReusedExchange
+ checkPartitionNum(
+ """
+ |SELECT /*+ REPARTITION */ t0.c2 FROM (
+ |SELECT t1.c1, (count(*) + c1) as c2 FROM t1 GROUP BY t1.c1
+ |) t0 JOIN (
+ |SELECT t1.c1, (count(*) + c1) as c2 FROM t1 GROUP BY t1.c1
+ |) t1 ON t0.c2 = t1.c2
+ |""".stripMargin,
+ 3,
+ 1
+ )
+
+ // one shuffle reader
+ checkPartitionNum(
+ """
+ |SELECT t0.c1 FROM (
+ |SELECT t1.c1 FROM t1 GROUP BY t1.c1
+ |) t0 JOIN (
+ |SELECT t1.c1 FROM t1 GROUP BY t1.c1
+ |) t1 ON t0.c1 = t1.c1
+ |""".stripMargin,
+ 1,
+ 1
+ )
+ }
+ }
+ }
+}
diff --git
a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala
new file mode 100644
index 0000000..939fea8
--- /dev/null
+++
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/InsertShuffleNodeBeforeJoinSuite.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS,
ShuffleExchangeLike}
+import org.apache.spark.sql.internal.SQLConf
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+
+class InsertShuffleNodeBeforeJoinSuite extends KyuubiSparkSQLExtensionTest {
+ test("force shuffle before join") {
+ def checkShuffleNodeNum(sqlString: String, num: Int): Unit = {
+ var expectedResult: Seq[Row] = Seq.empty
+ withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+ expectedResult = sql(sqlString).collect()
+ }
+ val df = sql(sqlString)
+ checkAnswer(df, expectedResult)
+ assert(
+ collect(df.queryExecution.executedPlan) {
+ case shuffle: ShuffleExchangeLike
+ if shuffle.shuffleOrigin == ENSURE_REQUIREMENTS => shuffle
+ }.size == num)
+ }
+
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+ KyuubiSQLConf.FORCE_SHUFFLE_BEFORE_JOIN.key -> "true") {
+ Seq("SHUFFLE_HASH", "MERGE").foreach { joinHint =>
+ // positive case
+ checkShuffleNodeNum(
+ s"""
+ |SELECT /*+ $joinHint(t2, t3) */ t1.c1, t1.c2, t2.c1, t3.c1 from
t1
+ | JOIN t2 ON t1.c1 = t2.c1
+ | JOIN t3 ON t1.c1 = t3.c1
+ | """.stripMargin, 4)
+
+ // negative case
+ checkShuffleNodeNum(
+ s"""
+ |SELECT /*+ $joinHint(t2, t3) */ t1.c1, t1.c2, t2.c1, t3.c1 from
t1
+ | JOIN t2 ON t1.c1 = t2.c1
+ | JOIN t3 ON t1.c2 = t3.c2
+ | """.stripMargin, 4)
+ }
+
+ checkShuffleNodeNum(
+ """
+ |SELECT t1.c1, t2.c1, t3.c2 from t1
+ | JOIN t2 ON t1.c1 = t2.c1
+ | JOIN (
+ | SELECT c2, count(*) FROM t1 GROUP BY c2
+ | ) t3 ON t1.c1 = t3.c2
+ | """.stripMargin, 5)
+
+ checkShuffleNodeNum(
+ """
+ |SELECT t1.c1, t2.c1, t3.c1 from t1
+ | JOIN t2 ON t1.c1 = t2.c1
+ | JOIN (
+ | SELECT c1, count(*) FROM t1 GROUP BY c1
+ | ) t3 ON t1.c1 = t3.c1
+ | """.stripMargin, 5)
+ }
+ }
+}
diff --git
a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/RepartitionBeforeWriteSuite.scala
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/RepartitionBeforeWriteSuite.scala
new file mode 100644
index 0000000..239afff
--- /dev/null
+++
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/RepartitionBeforeWriteSuite.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Multiply}
+import org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
+import org.apache.spark.sql.hive.HiveUtils
+import
org.apache.spark.sql.hive.execution.OptimizedCreateHiveTableAsSelectCommand
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+
+class RepartitionBeforeWriteSuite extends KyuubiSparkSQLExtensionTest {
+ test("check repartition exists") {
+ def check(df: DataFrame): Unit = {
+ assert(
+ df.queryExecution.analyzed.collect {
+ case r: RepartitionByExpression =>
+ assert(r.optNumPartitions ===
+
spark.sessionState.conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM))
+ r
+ }.size == 1
+ )
+ }
+
+ // It's better to set config explicitly in case of we change the default
value.
+ withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true") {
+ Seq("USING PARQUET", "").foreach { storage =>
+ withTable("tmp1") {
+ sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2
string)")
+ check(sql("INSERT INTO TABLE tmp1 PARTITION(c2='a') " +
+ "SELECT * FROM VALUES(1),(2) AS t(c1)"))
+ }
+
+ withTable("tmp1") {
+ sql(s"CREATE TABLE tmp1 (c1 int) $storage")
+ check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS
t(c1)"))
+ check(sql("INSERT INTO TABLE tmp1 " +
+ "SELECT * FROM VALUES(1),(2),(3) AS t(c1) DISTRIBUTE BY c1"))
+ }
+
+ withTable("tmp1") {
+ sql(s"CREATE TABLE tmp1 $storage AS SELECT * FROM VALUES(1),(2),(3)
AS t(c1)")
+ }
+
+ withTable("tmp1") {
+ sql(s"CREATE TABLE tmp1 $storage PARTITIONED BY(c2) AS " +
+ s"SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)")
+ }
+
+ withTable("tmp1") {
+ sql(s"CREATE TABLE tmp1 $storage PARTITIONED BY(c2) AS " +
+ s"SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2) DISTRIBUTE BY
rand()")
+ }
+ }
+ }
+ }
+
+ test("check repartition does not exists") {
+ def check(df: DataFrame): Unit = {
+ assert(
+ df.queryExecution.analyzed.collect {
+ case r: RepartitionByExpression => r
+ }.isEmpty
+ )
+ }
+
+ withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true") {
+ // test no write command
+ check(sql("SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)"))
+ check(sql("SELECT count(*) FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)"))
+
+ // test not supported plan
+ withTable("tmp1") {
+ sql(s"CREATE TABLE tmp1 (c1 int) PARTITIONED BY (c2 string)")
+ check(sql("INSERT INTO TABLE tmp1 PARTITION(c2) " +
+ "SELECT /*+ repartition(10) */ * FROM VALUES(1, 'a'),(2, 'b') AS
t(c1, c2)"))
+ check(sql("INSERT INTO TABLE tmp1 PARTITION(c2) " +
+ "SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2) ORDER BY c1"))
+ check(sql("INSERT INTO TABLE tmp1 PARTITION(c2) " +
+ "SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2) LIMIT 10"))
+ }
+ }
+
+ withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "false") {
+ Seq("USING PARQUET", "").foreach { storage =>
+ withTable("tmp1") {
+ sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2
string)")
+ check(sql("INSERT INTO TABLE tmp1 PARTITION(c2) " +
+ "SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)"))
+ }
+
+ withTable("tmp1") {
+ sql(s"CREATE TABLE tmp1 (c1 int) $storage")
+ check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS
t(c1)"))
+ }
+
+ withTable("tmp1") {
+ sql(s"CREATE TABLE tmp1 $storage AS SELECT * FROM VALUES(1),(2),(3)
AS t(c1)")
+ }
+
+ withTable("tmp1") {
+ sql(s"CREATE TABLE tmp1 $storage PARTITIONED BY(c2) AS " +
+ s"SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)")
+ }
+ }
+ }
+ }
+
+ test("test dynamic partition write") {
+ def checkRepartitionExpression(df: DataFrame): Unit = {
+ assert(df.queryExecution.analyzed.collect {
+ case r: RepartitionByExpression if r.partitionExpressions.size == 2 =>
+ assert(r.partitionExpressions.head.asInstanceOf[Attribute].name ===
"c2")
+
assert(r.partitionExpressions(1).asInstanceOf[Cast].child.asInstanceOf[Multiply]
+ .left.asInstanceOf[Attribute].name.startsWith("_nondeterministic"))
+ r
+ }.size == 1)
+ }
+
+ withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true",
+ KyuubiSQLConf.DYNAMIC_PARTITION_INSERTION_REPARTITION_NUM.key -> "2") {
+ Seq("USING PARQUET", "").foreach { storage =>
+ withTable("tmp1") {
+ sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2
string)")
+ checkRepartitionExpression(sql("INSERT INTO TABLE tmp1 SELECT 1 as
c1, 'a' as c2 "))
+ }
+
+ withTable("tmp1") {
+ checkRepartitionExpression(
+ sql("CREATE TABLE tmp1 PARTITIONED BY(C2) SELECT 1 as c1, 'a' as
c2 "))
+ }
+ }
+ }
+ }
+
+ test("OptimizedCreateHiveTableAsSelectCommand") {
+ withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "true",
+ HiveUtils.CONVERT_METASTORE_CTAS.key -> "true") {
+ withTable("t") {
+ val df = sql(s"CREATE TABLE t STORED AS parquet AS SELECT 1 as a")
+ val ctas = df.queryExecution.analyzed.collect {
+ case _: OptimizedCreateHiveTableAsSelectCommand => true
+ }
+ assert(ctas.size == 1)
+ val repartition = df.queryExecution.analyzed.collect {
+ case _: RepartitionByExpression => true
+ }
+ assert(repartition.size == 1)
+ }
+ }
+ }
+}
diff --git
a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/SqlClassificationSuite.scala
similarity index 65%
rename from
dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
rename to
dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/SqlClassificationSuite.scala
index c94b33f..7ff0024 100644
---
a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiExtensionSuite.scala
+++
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/SqlClassificationSuite.scala
@@ -19,361 +19,9 @@ package org.apache.spark.sql
import scala.collection.mutable.Set
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Multiply}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit,
RepartitionByExpression}
-import org.apache.spark.sql.execution.adaptive.{CustomShuffleReaderExec,
QueryStageExec}
-import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS,
ShuffleExchangeLike}
-import org.apache.spark.sql.hive.HiveUtils
-import
org.apache.spark.sql.hive.execution.OptimizedCreateHiveTableAsSelectCommand
-import org.apache.spark.sql.internal.SQLConf
-
-import org.apache.kyuubi.sql.{FinalStageConfigIsolation, KyuubiSQLConf}
-import org.apache.kyuubi.sql.watchdog.MaxHivePartitionExceedException
-
-class KyuubiExtensionSuite extends KyuubiSparkSQLExtensionTest {
-
- test("check repartition exists") {
- def check(df: DataFrame): Unit = {
- assert(
- df.queryExecution.analyzed.collect {
- case r: RepartitionByExpression =>
- assert(r.optNumPartitions ===
-
spark.sessionState.conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM))
- r
- }.size == 1
- )
- }
-
- // It's better to set config explicitly in case of we change the default
value.
- withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true") {
- Seq("USING PARQUET", "").foreach { storage =>
- withTable("tmp1") {
- sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2
string)")
- check(sql("INSERT INTO TABLE tmp1 PARTITION(c2='a') " +
- "SELECT * FROM VALUES(1),(2) AS t(c1)"))
- }
-
- withTable("tmp1") {
- sql(s"CREATE TABLE tmp1 (c1 int) $storage")
- check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS
t(c1)"))
- check(sql("INSERT INTO TABLE tmp1 " +
- "SELECT * FROM VALUES(1),(2),(3) AS t(c1) DISTRIBUTE BY c1"))
- }
-
- withTable("tmp1") {
- sql(s"CREATE TABLE tmp1 $storage AS SELECT * FROM VALUES(1),(2),(3)
AS t(c1)")
- }
-
- withTable("tmp1") {
- sql(s"CREATE TABLE tmp1 $storage PARTITIONED BY(c2) AS " +
- s"SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)")
- }
-
- withTable("tmp1") {
- sql(s"CREATE TABLE tmp1 $storage PARTITIONED BY(c2) AS " +
- s"SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2) DISTRIBUTE BY
rand()")
- }
- }
- }
- }
-
- test("check repartition does not exists") {
- def check(df: DataFrame): Unit = {
- assert(
- df.queryExecution.analyzed.collect {
- case r: RepartitionByExpression => r
- }.isEmpty
- )
- }
-
- withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true") {
- // test no write command
- check(sql("SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)"))
- check(sql("SELECT count(*) FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)"))
-
- // test not supported plan
- withTable("tmp1") {
- sql(s"CREATE TABLE tmp1 (c1 int) PARTITIONED BY (c2 string)")
- check(sql("INSERT INTO TABLE tmp1 PARTITION(c2) " +
- "SELECT /*+ repartition(10) */ * FROM VALUES(1, 'a'),(2, 'b') AS
t(c1, c2)"))
- check(sql("INSERT INTO TABLE tmp1 PARTITION(c2) " +
- "SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2) ORDER BY c1"))
- check(sql("INSERT INTO TABLE tmp1 PARTITION(c2) " +
- "SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2) LIMIT 10"))
- }
- }
-
- withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "false") {
- Seq("USING PARQUET", "").foreach { storage =>
- withTable("tmp1") {
- sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2
string)")
- check(sql("INSERT INTO TABLE tmp1 PARTITION(c2) " +
- "SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)"))
- }
-
- withTable("tmp1") {
- sql(s"CREATE TABLE tmp1 (c1 int) $storage")
- check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS
t(c1)"))
- }
-
- withTable("tmp1") {
- sql(s"CREATE TABLE tmp1 $storage AS SELECT * FROM VALUES(1),(2),(3)
AS t(c1)")
- }
-
- withTable("tmp1") {
- sql(s"CREATE TABLE tmp1 $storage PARTITIONED BY(c2) AS " +
- s"SELECT * FROM VALUES(1, 'a'),(2, 'b') AS t(c1, c2)")
- }
- }
- }
- }
-
- test("test dynamic partition write") {
- def checkRepartitionExpression(df: DataFrame): Unit = {
- assert(df.queryExecution.analyzed.collect {
- case r: RepartitionByExpression if r.partitionExpressions.size == 2 =>
- assert(r.partitionExpressions.head.asInstanceOf[Attribute].name ===
"c2")
-
assert(r.partitionExpressions(1).asInstanceOf[Cast].child.asInstanceOf[Multiply]
- .left.asInstanceOf[Attribute].name.startsWith("_nondeterministic"))
- r
- }.size == 1)
- }
-
- withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true",
- KyuubiSQLConf.DYNAMIC_PARTITION_INSERTION_REPARTITION_NUM.key -> "2") {
- Seq("USING PARQUET", "").foreach { storage =>
- withTable("tmp1") {
- sql(s"CREATE TABLE tmp1 (c1 int) $storage PARTITIONED BY (c2
string)")
- checkRepartitionExpression(sql("INSERT INTO TABLE tmp1 SELECT 1 as
c1, 'a' as c2 "))
- }
-
- withTable("tmp1") {
- checkRepartitionExpression(
- sql("CREATE TABLE tmp1 PARTITIONED BY(C2) SELECT 1 as c1, 'a' as
c2 "))
- }
- }
- }
- }
-
- test("force shuffle before join") {
- def checkShuffleNodeNum(sqlString: String, num: Int): Unit = {
- var expectedResult: Seq[Row] = Seq.empty
- withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
- expectedResult = sql(sqlString).collect()
- }
- val df = sql(sqlString)
- checkAnswer(df, expectedResult)
- assert(
- collect(df.queryExecution.executedPlan) {
- case shuffle: ShuffleExchangeLike
- if shuffle.shuffleOrigin == ENSURE_REQUIREMENTS => shuffle
- }.size == num)
- }
-
- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
- KyuubiSQLConf.FORCE_SHUFFLE_BEFORE_JOIN.key -> "true") {
- Seq("SHUFFLE_HASH", "MERGE").foreach { joinHint =>
- // positive case
- checkShuffleNodeNum(
- s"""
- |SELECT /*+ $joinHint(t2, t3) */ t1.c1, t1.c2, t2.c1, t3.c1 from t1
- | JOIN t2 ON t1.c1 = t2.c1
- | JOIN t3 ON t1.c1 = t3.c1
- | """.stripMargin, 4)
-
- // negative case
- checkShuffleNodeNum(
- s"""
- |SELECT /*+ $joinHint(t2, t3) */ t1.c1, t1.c2, t2.c1, t3.c1 from
t1
- | JOIN t2 ON t1.c1 = t2.c1
- | JOIN t3 ON t1.c2 = t3.c2
- | """.stripMargin, 4)
- }
-
- checkShuffleNodeNum(
- """
- |SELECT t1.c1, t2.c1, t3.c2 from t1
- | JOIN t2 ON t1.c1 = t2.c1
- | JOIN (
- | SELECT c2, count(*) FROM t1 GROUP BY c2
- | ) t3 ON t1.c1 = t3.c2
- | """.stripMargin, 5)
-
- checkShuffleNodeNum(
- """
- |SELECT t1.c1, t2.c1, t3.c1 from t1
- | JOIN t2 ON t1.c1 = t2.c1
- | JOIN (
- | SELECT c1, count(*) FROM t1 GROUP BY c1
- | ) t3 ON t1.c1 = t3.c1
- | """.stripMargin, 5)
- }
- }
-
- test("final stage config set reset check") {
- withSQLConf(KyuubiSQLConf.FINAL_STAGE_CONFIG_ISOLATION.key -> "true",
- "spark.sql.finalStage.adaptive.coalescePartitions.minPartitionNum" ->
"1",
- "spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes" -> "100") {
- // use loop to double check final stage config doesn't affect the sql
query each other
- (1 to 3).foreach { _ =>
- sql("SELECT COUNT(*) FROM VALUES(1) as t(c)").collect()
- assert(spark.sessionState.conf.getConfString(
-
"spark.sql.previousStage.adaptive.coalescePartitions.minPartitionNum") ===
- FinalStageConfigIsolation.INTERNAL_UNSET_CONFIG_TAG)
- assert(spark.sessionState.conf.getConfString(
- "spark.sql.adaptive.coalescePartitions.minPartitionNum") ===
- "1")
- assert(spark.sessionState.conf.getConfString(
- "spark.sql.finalStage.adaptive.coalescePartitions.minPartitionNum")
===
- "1")
-
- // 64MB
- assert(spark.sessionState.conf.getConfString(
- "spark.sql.previousStage.adaptive.advisoryPartitionSizeInBytes") ===
- "67108864b")
- assert(spark.sessionState.conf.getConfString(
- "spark.sql.adaptive.advisoryPartitionSizeInBytes") ===
- "100")
- assert(spark.sessionState.conf.getConfString(
- "spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes") ===
- "100")
- }
-
- sql("SET spark.sql.adaptive.advisoryPartitionSizeInBytes=1")
- assert(spark.sessionState.conf.getConfString(
- "spark.sql.adaptive.advisoryPartitionSizeInBytes") ===
- "1")
- assert(!spark.sessionState.conf.contains(
- "spark.sql.previousStage.adaptive.advisoryPartitionSizeInBytes"))
-
- sql("SET a=1")
- assert(spark.sessionState.conf.getConfString("a") === "1")
-
- sql("RESET spark.sql.adaptive.coalescePartitions.minPartitionNum")
- assert(!spark.sessionState.conf.contains(
- "spark.sql.adaptive.coalescePartitions.minPartitionNum"))
- assert(!spark.sessionState.conf.contains(
- "spark.sql.previousStage.adaptive.coalescePartitions.minPartitionNum"))
-
- sql("RESET a")
- assert(!spark.sessionState.conf.contains("a"))
- }
- }
-
- test("final stage config isolation") {
- def checkPartitionNum(
- sqlString: String, previousPartitionNum: Int, finalPartitionNum: Int):
Unit = {
- val df = sql(sqlString)
- df.collect()
- val shuffleReaders = collect(df.queryExecution.executedPlan) {
- case customShuffleReader: CustomShuffleReaderExec =>
customShuffleReader
- }
- assert(shuffleReaders.nonEmpty)
- // reorder stage by stage id to ensure we get the right stage
- val sortedShuffleReaders = shuffleReaders.sortWith {
- case (s1, s2) =>
- s1.child.asInstanceOf[QueryStageExec].id <
s2.child.asInstanceOf[QueryStageExec].id
- }
- if (sortedShuffleReaders.length > 1) {
- assert(sortedShuffleReaders.head.partitionSpecs.length ===
previousPartitionNum)
- }
- assert(sortedShuffleReaders.last.partitionSpecs.length ===
finalPartitionNum)
- assert(df.rdd.partitions.length === finalPartitionNum)
- }
-
- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
- SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
- SQLConf.SHUFFLE_PARTITIONS.key -> "3",
- KyuubiSQLConf.FINAL_STAGE_CONFIG_ISOLATION.key -> "true",
- "spark.sql.adaptive.advisoryPartitionSizeInBytes" -> "1",
- "spark.sql.adaptive.coalescePartitions.minPartitionSize" -> "1",
- "spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes" ->
"10000000") {
-
- // use loop to double check final stage config doesn't affect the sql
query each other
- (1 to 3).foreach { _ =>
- checkPartitionNum(
- "SELECT c1, count(*) FROM t1 GROUP BY c1",
- 1,
- 1)
-
- checkPartitionNum(
- "SELECT c2, count(*) FROM (SELECT c1, count(*) as c2 FROM t1 GROUP
BY c1) GROUP BY c2",
- 3,
- 1)
-
- checkPartitionNum(
- "SELECT t1.c1, count(*) FROM t1 JOIN t2 ON t1.c2 = t2.c2 GROUP BY
t1.c1",
- 3,
- 1)
-
- checkPartitionNum(
- """
- | SELECT /*+ REPARTITION */
- | t1.c1, count(*) FROM t1
- | JOIN t2 ON t1.c2 = t2.c2
- | JOIN t3 ON t1.c1 = t3.c1
- | GROUP BY t1.c1
- |""".stripMargin,
- 3,
- 1)
-
- // one shuffle reader
- checkPartitionNum(
- """
- | SELECT /*+ BROADCAST(t1) */
- | t1.c1, t2.c2 FROM t1
- | JOIN t2 ON t1.c2 = t2.c2
- | DISTRIBUTE BY c1
- |""".stripMargin,
- 1,
- 1)
-
- // test ReusedExchange
- checkPartitionNum(
- """
- |SELECT /*+ REPARTITION */ t0.c2 FROM (
- |SELECT t1.c1, (count(*) + c1) as c2 FROM t1 GROUP BY t1.c1
- |) t0 JOIN (
- |SELECT t1.c1, (count(*) + c1) as c2 FROM t1 GROUP BY t1.c1
- |) t1 ON t0.c2 = t1.c2
- |""".stripMargin,
- 3,
- 1
- )
-
- // one shuffle reader
- checkPartitionNum(
- """
- |SELECT t0.c1 FROM (
- |SELECT t1.c1 FROM t1 GROUP BY t1.c1
- |) t0 JOIN (
- |SELECT t1.c1 FROM t1 GROUP BY t1.c1
- |) t1 ON t0.c1 = t1.c1
- |""".stripMargin,
- 1,
- 1
- )
- }
- }
- }
-
- test("OptimizedCreateHiveTableAsSelectCommand") {
- withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "true",
- HiveUtils.CONVERT_METASTORE_CTAS.key -> "true") {
- withTable("t") {
- val df = sql(s"CREATE TABLE t STORED AS parquet AS SELECT 1 as a")
- val ctas = df.queryExecution.analyzed.collect {
- case _: OptimizedCreateHiveTableAsSelectCommand => true
- }
- assert(ctas.size == 1)
- val repartition = df.queryExecution.analyzed.collect {
- case _: RepartitionByExpression => true
- }
- assert(repartition.size == 1)
- }
- }
- }
+import org.apache.kyuubi.sql.KyuubiSQLConf
+class SqlClassificationSuite extends KyuubiSparkSQLExtensionTest {
test("Sql classification for ddl") {
withSQLConf(KyuubiSQLConf.SQL_CLASSIFICATION_ENABLED.key -> "true") {
withDatabase("inventory") {
@@ -1268,131 +916,4 @@ class KyuubiExtensionSuite extends
KyuubiSparkSQLExtensionTest {
println("auxiliary statement simple name is :" +
auxiStatementSimpleName.toSeq.sorted)
// scalastyle:on println
}
- test("test watchdog with scan maxHivePartitions") {
- withTable("test", "temp") {
- sql(
- s"""
- |CREATE TABLE test(i int)
- |PARTITIONED BY (p int)
- |STORED AS textfile""".stripMargin)
- spark.range(0, 10, 1).selectExpr("id as col")
- .createOrReplaceTempView("temp")
-
- for (part <- Range(0, 10)) {
- sql(
- s"""
- |INSERT OVERWRITE TABLE test PARTITION (p='$part')
- |select col from temp""".stripMargin)
- }
-
- withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_HIVEPARTITION.key -> "5") {
-
- sql("SELECT * FROM test where p=1").queryExecution.sparkPlan
-
- sql(
- s"SELECT * FROM test WHERE p in (${Range(0,
5).toList.mkString(",")})")
- .queryExecution.sparkPlan
-
- intercept[MaxHivePartitionExceedException](
- sql("SELECT * FROM test").queryExecution.sparkPlan)
-
- intercept[MaxHivePartitionExceedException](sql(
- s"SELECT * FROM test WHERE p in (${Range(0,
6).toList.mkString(",")})")
- .queryExecution.sparkPlan)
-
- }
- }
- }
-
- test("test watchdog with query forceMaxOutputRows") {
-
- withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
-
- assert(sql("SELECT * FROM t1")
- .queryExecution.analyzed.isInstanceOf[GlobalLimit])
-
- assert(sql("SELECT * FROM t1 LIMIT 1")
- .queryExecution.analyzed.asInstanceOf[GlobalLimit].maxRows.contains(1))
-
- assert(sql("SELECT * FROM t1 LIMIT 11")
-
.queryExecution.analyzed.asInstanceOf[GlobalLimit].maxRows.contains(10))
-
- assert(!sql("SELECT count(*) FROM t1")
- .queryExecution.analyzed.isInstanceOf[GlobalLimit])
-
- assert(sql(
- """
- |SELECT c1, COUNT(*)
- |FROM t1
- |GROUP BY c1
- |""".stripMargin).queryExecution.analyzed.isInstanceOf[GlobalLimit])
-
- assert(sql(
- """
- |WITH custom_cte AS (
- |SELECT * FROM t1
- |)
- |
- |SELECT * FROM custom_cte
- |""".stripMargin).queryExecution
- .analyzed.isInstanceOf[GlobalLimit])
-
- assert(sql(
- """
- |WITH custom_cte AS (
- |SELECT * FROM t1
- |)
- |
- |SELECT * FROM custom_cte
- |LIMIT 1
- |""".stripMargin).queryExecution
- .analyzed.asInstanceOf[GlobalLimit].maxRows.contains(1))
-
- assert(sql(
- """
- |WITH custom_cte AS (
- |SELECT * FROM t1
- |)
- |
- |SELECT * FROM custom_cte
- |LIMIT 11
- |""".stripMargin).queryExecution
- .analyzed.asInstanceOf[GlobalLimit].maxRows.contains(10))
-
- assert(!sql(
- """
- |WITH custom_cte AS (
- |SELECT * FROM t1
- |)
- |
- |SELECT COUNT(*) FROM custom_cte
- |""".stripMargin).queryExecution
- .analyzed.isInstanceOf[GlobalLimit])
-
- assert(sql(
- """
- |WITH custom_cte AS (
- |SELECT * FROM t1
- |)
- |
- |SELECT c1, COUNT(*)
- |FROM custom_cte
- |GROUP BY c1
- |""".stripMargin).queryExecution
- .analyzed.isInstanceOf[GlobalLimit])
-
- assert(sql(
- """
- |WITH custom_cte AS (
- |SELECT * FROM t1
- |)
- |
- |SELECT c1, COUNT(*)
- |FROM custom_cte
- |GROUP BY c1
- |LIMIT 11
- |""".stripMargin).queryExecution
- .analyzed.asInstanceOf[GlobalLimit].maxRows.contains(10))
- }
- }
}
diff --git
a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/WatchDogSuite.scala
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/WatchDogSuite.scala
new file mode 100644
index 0000000..483a2bf
--- /dev/null
+++
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/WatchDogSuite.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.plans.logical.GlobalLimit
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+import org.apache.kyuubi.sql.watchdog.MaxHivePartitionExceedException
+
+class WatchDogSuite extends KyuubiSparkSQLExtensionTest {
+ test("test watchdog with scan maxHivePartitions") {
+ withTable("test", "temp") {
+ sql(
+ s"""
+ |CREATE TABLE test(i int)
+ |PARTITIONED BY (p int)
+ |STORED AS textfile""".stripMargin)
+ spark.range(0, 10, 1).selectExpr("id as col")
+ .createOrReplaceTempView("temp")
+
+ for (part <- Range(0, 10)) {
+ sql(
+ s"""
+ |INSERT OVERWRITE TABLE test PARTITION (p='$part')
+ |select col from temp""".stripMargin)
+ }
+
+ withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_HIVEPARTITION.key -> "5") {
+
+ sql("SELECT * FROM test where p=1").queryExecution.sparkPlan
+
+ sql(
+ s"SELECT * FROM test WHERE p in (${Range(0,
5).toList.mkString(",")})")
+ .queryExecution.sparkPlan
+
+ intercept[MaxHivePartitionExceedException](
+ sql("SELECT * FROM test").queryExecution.sparkPlan)
+
+ intercept[MaxHivePartitionExceedException](sql(
+ s"SELECT * FROM test WHERE p in (${Range(0,
6).toList.mkString(",")})")
+ .queryExecution.sparkPlan)
+
+ }
+ }
+ }
+
+ test("test watchdog with query forceMaxOutputRows") {
+
+ withSQLConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS.key -> "10") {
+
+ assert(sql("SELECT * FROM t1")
+ .queryExecution.analyzed.isInstanceOf[GlobalLimit])
+
+ assert(sql("SELECT * FROM t1 LIMIT 1")
+ .queryExecution.analyzed.asInstanceOf[GlobalLimit].maxRows.contains(1))
+
+ assert(sql("SELECT * FROM t1 LIMIT 11")
+
.queryExecution.analyzed.asInstanceOf[GlobalLimit].maxRows.contains(10))
+
+ assert(!sql("SELECT count(*) FROM t1")
+ .queryExecution.analyzed.isInstanceOf[GlobalLimit])
+
+ assert(sql(
+ """
+ |SELECT c1, COUNT(*)
+ |FROM t1
+ |GROUP BY c1
+ |""".stripMargin).queryExecution.analyzed.isInstanceOf[GlobalLimit])
+
+ assert(sql(
+ """
+ |WITH custom_cte AS (
+ |SELECT * FROM t1
+ |)
+ |
+ |SELECT * FROM custom_cte
+ |""".stripMargin).queryExecution
+ .analyzed.isInstanceOf[GlobalLimit])
+
+ assert(sql(
+ """
+ |WITH custom_cte AS (
+ |SELECT * FROM t1
+ |)
+ |
+ |SELECT * FROM custom_cte
+ |LIMIT 1
+ |""".stripMargin).queryExecution
+ .analyzed.asInstanceOf[GlobalLimit].maxRows.contains(1))
+
+ assert(sql(
+ """
+ |WITH custom_cte AS (
+ |SELECT * FROM t1
+ |)
+ |
+ |SELECT * FROM custom_cte
+ |LIMIT 11
+ |""".stripMargin).queryExecution
+ .analyzed.asInstanceOf[GlobalLimit].maxRows.contains(10))
+
+ assert(!sql(
+ """
+ |WITH custom_cte AS (
+ |SELECT * FROM t1
+ |)
+ |
+ |SELECT COUNT(*) FROM custom_cte
+ |""".stripMargin).queryExecution
+ .analyzed.isInstanceOf[GlobalLimit])
+
+ assert(sql(
+ """
+ |WITH custom_cte AS (
+ |SELECT * FROM t1
+ |)
+ |
+ |SELECT c1, COUNT(*)
+ |FROM custom_cte
+ |GROUP BY c1
+ |""".stripMargin).queryExecution
+ .analyzed.isInstanceOf[GlobalLimit])
+
+ assert(sql(
+ """
+ |WITH custom_cte AS (
+ |SELECT * FROM t1
+ |)
+ |
+ |SELECT c1, COUNT(*)
+ |FROM custom_cte
+ |GROUP BY c1
+ |LIMIT 11
+ |""".stripMargin).queryExecution
+ .analyzed.asInstanceOf[GlobalLimit].maxRows.contains(10))
+ }
+ }
+}