This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/branch-1.4 by this push:
new 904d11d [KYUUBI #1443] MaxPartitionStrategy support file source based
table.
904d11d is described below
commit 904d11d2394ab0c873d526f27529ab5b5021c652
Author: Fu Chen <[email protected]>
AuthorDate: Thu Nov 25 17:34:17 2021 +0800
[KYUUBI #1443] MaxPartitionStrategy support file source based table.
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
'[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
Fix
[comment](https://github.com/apache/incubator-kyuubi/pull/1086#discussion_r707103617)
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1443 from cfmcgrady/max-partition-strategy.
Closes #1443
e071ebc2 [Fu Chen] typo
f7d4ea16 [Fu Chen] refactor
97a36016 [Fu Chen] truncate RootPaths
c3fe93aa [Fu Chen] remove dpp suite
9075e5ba [Fu Chen] refactor
5a71f459 [Fu Chen] trigger GitHub actions
17df25d2 [Fu Chen] fix style
4dfe6a7e [Fu Chen] fix style
0b2df063 [Fu Chen] update docs
305c6e88 [Fu Chen] fix style
33b9b3d5 [Fu Chen] fix style
885e8ed7 [Fu Chen] data source support
Authored-by: Fu Chen <[email protected]>
Signed-off-by: ulysses-you <[email protected]>
(cherry picked from commit 78c9cf3756e406271b032a5ae18dd55631801754)
Signed-off-by: ulysses-you <[email protected]>
---
.../kyuubi/sql/KyuubiSparkSQLExtension.scala | 4 +-
.../sql/watchdog/KyuubiWatchDogException.scala | 2 +-
.../sql/watchdog/MaxHivePartitionStrategy.scala | 81 ---------
.../kyuubi/sql/watchdog/MaxPartitionStrategy.scala | 185 +++++++++++++++++++++
.../spark/sql/PruneFileSourcePartitionHelper.scala | 46 +++++
.../scala/org/apache/spark/sql/WatchDogSuite.scala | 76 ++++++---
.../org/apache/kyuubi/sql/KyuubiSQLConf.scala | 10 +-
docs/sql/rules.md | 2 +-
8 files changed, 290 insertions(+), 116 deletions(-)
diff --git
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
index f5a2ade..03e736b 100644
---
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
+++
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
@@ -20,7 +20,7 @@ package org.apache.kyuubi.sql
import org.apache.spark.sql.SparkSessionExtensions
import org.apache.kyuubi.sql.sqlclassification.KyuubiSqlClassification
-import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule,
MarkAggregateOrderRule, MaxHivePartitionStrategy}
+import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule,
MarkAggregateOrderRule, MaxPartitionStrategy}
// scalastyle:off line.size.limit
/**
@@ -40,6 +40,6 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions
=> Unit) {
extensions.injectPostHocResolutionRule(RepartitionBeforeWritingHive)
extensions.injectPostHocResolutionRule(ForcedMaxOutputRowsRule)
- extensions.injectPlannerStrategy(MaxHivePartitionStrategy)
+ extensions.injectPlannerStrategy(MaxPartitionStrategy)
}
}
diff --git
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/KyuubiWatchDogException.scala
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/KyuubiWatchDogException.scala
index e84007d..b3c58af 100644
---
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/KyuubiWatchDogException.scala
+++
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/KyuubiWatchDogException.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.sql.watchdog
import org.apache.kyuubi.sql.KyuubiSQLExtensionException
-final class MaxHivePartitionExceedException(
+final class MaxPartitionExceedException(
private val reason: String = "",
private val cause: Throwable = None.orNull)
extends KyuubiSQLExtensionException(reason, cause)
diff --git
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxHivePartitionStrategy.scala
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxHivePartitionStrategy.scala
deleted file mode 100644
index 7f71584..0000000
---
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxHivePartitionStrategy.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.watchdog
-
-import org.apache.spark.sql.{SparkSession, Strategy}
-import org.apache.spark.sql.catalyst.SQLConfHelper
-import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
-import org.apache.spark.sql.catalyst.planning.ScanOperation
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkPlan
-
-import org.apache.kyuubi.sql.KyuubiSQLConf
-
-/**
- * Add maxHivePartitions Strategy to avoid scan excessive hive partitions on
partitioned table
- * 1 Check if scan exceed maxHivePartition
- * 2 Check if Using partitionFilter on partitioned table
- * This Strategy Add Planner Strategy after LogicalOptimizer
- */
-case class MaxHivePartitionStrategy(session: SparkSession)
- extends Strategy with SQLConfHelper {
- override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
- conf.getConf(KyuubiSQLConf.WATCHDOG_MAX_HIVEPARTITION) match {
- case Some(maxHivePartition) => plan match {
- case ScanOperation(_, _, relation: HiveTableRelation) if
relation.isPartitioned =>
- relation.prunedPartitions match {
- case Some(prunedPartitions) =>
- if (prunedPartitions.size > maxHivePartition) {
- throw new MaxHivePartitionExceedException(
- s"""
- |SQL job scan hive partition: ${prunedPartitions.size}
- |exceed restrict of hive scan maxPartition
$maxHivePartition
- |You should optimize your SQL logical according
partition structure
- |or shorten query scope such as p_date, detail as below:
- |Table: ${relation.tableMeta.qualifiedName}
- |Owner: ${relation.tableMeta.owner}
- |Partition Structure:
${relation.partitionCols.map(_.name).mkString(" -> ")}
- |""".stripMargin)
- } else {
- Nil
- }
- case _ =>
- val totalPartitions = session
- .sessionState.catalog.externalCatalog.listPartitionNames(
- relation.tableMeta.database,
- relation.tableMeta.identifier.table)
- if (totalPartitions.size > maxHivePartition) {
- throw new MaxHivePartitionExceedException(
- s"""
- |Your SQL job scan a whole huge table without any
partition filter,
- |You should optimize your SQL logical according
partition structure
- |or shorten query scope such as p_date, detail as below:
- |Table: ${relation.tableMeta.qualifiedName}
- |Owner: ${relation.tableMeta.owner}
- |Partition Structure:
${relation.partitionCols.map(_.name).mkString(" -> ")}
- |""".stripMargin)
- } else {
- Nil
- }
- }
- case _ => Nil
- }
- case _ => Nil
- }
- }
-}
diff --git
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxPartitionStrategy.scala
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxPartitionStrategy.scala
new file mode 100644
index 0000000..d724c7e
--- /dev/null
+++
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxPartitionStrategy.scala
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.watchdog
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.{PruneFileSourcePartitionHelper, SparkSession,
Strategy}
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation}
+import org.apache.spark.sql.catalyst.planning.ScanOperation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.{CatalogFileIndex,
HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
+import org.apache.spark.sql.types.StructType
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+
+/**
+ * Add maxPartitions Strategy to avoid scan excessive partitions on
partitioned table
+ * 1 Check if scan exceed maxPartition
+ * 2 Check if Using partitionFilter on partitioned table
+ * This Strategy Add Planner Strategy after LogicalOptimizer
+ */
+case class MaxPartitionStrategy(session: SparkSession)
+ extends Strategy
+ with SQLConfHelper
+ with PruneFileSourcePartitionHelper {
+ override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
+ val maxScanPartitionsOpt =
conf.getConf(KyuubiSQLConf.WATCHDOG_MAX_PARTITIONS)
+
+ if (maxScanPartitionsOpt.isDefined) {
+ checkRelationMaxPartitions(plan, maxScanPartitionsOpt.get)
+ }
+ Nil
+ }
+
+ private def checkRelationMaxPartitions(
+ plan: LogicalPlan,
+ maxScanPartitions: Int): Unit = {
+ plan match {
+ case ScanOperation(_, _, relation: HiveTableRelation) if
relation.isPartitioned =>
+ relation.prunedPartitions match {
+ case Some(prunedPartitions) =>
+ if (prunedPartitions.size > maxScanPartitions) {
+ throw new MaxPartitionExceedException(
+ s"""
+ |SQL job scan hive partition: ${prunedPartitions.size}
+ |exceed restrict of hive scan maxPartition
$maxScanPartitions
+ |You should optimize your SQL logical according partition
structure
+ |or shorten query scope such as p_date, detail as below:
+ |Table: ${relation.tableMeta.qualifiedName}
+ |Owner: ${relation.tableMeta.owner}
+ |Partition Structure:
${relation.partitionCols.map(_.name).mkString(", ")}
+ |""".stripMargin)
+ }
+ case _ =>
+ val totalPartitions = session
+ .sessionState.catalog.externalCatalog.listPartitionNames(
+ relation.tableMeta.database,
+ relation.tableMeta.identifier.table)
+ if (totalPartitions.size > maxScanPartitions) {
+ throw new MaxPartitionExceedException(
+ s"""
+ |Your SQL job scan a whole huge table without any partition
filter,
+ |You should optimize your SQL logical according partition
structure
+ |or shorten query scope such as p_date, detail as below:
+ |Table: ${relation.tableMeta.qualifiedName}
+ |Owner: ${relation.tableMeta.owner}
+ |Partition Structure:
${relation.partitionCols.map(_.name).mkString(", ")}
+ |""".stripMargin)
+ }
+ }
+ case ScanOperation(
+ _,
+ filters,
+ relation @ LogicalRelation(
+ fsRelation @ HadoopFsRelation(
+ fileIndex: InMemoryFileIndex,
+ partitionSchema,
+ _,
+ _,
+ _,
+ _),
+ _,
+ _,
+ _)) if fsRelation.partitionSchemaOption.isDefined =>
+ val (partitionKeyFilters, dataFilter) =
+ getPartitionKeyFiltersAndDataFilters(
+ fsRelation.sparkSession,
+ relation,
+ partitionSchema,
+ filters,
+ relation.output)
+ val prunedPartitionSize = fileIndex.listFiles(
+ partitionKeyFilters.toSeq,
+ dataFilter)
+ .size
+ if (prunedPartitionSize > maxScanPartitions) {
+ throw maxPartitionExceedError(
+ prunedPartitionSize,
+ maxScanPartitions,
+ relation.catalogTable,
+ fileIndex.rootPaths,
+ fsRelation.partitionSchema)
+ }
+ case ScanOperation(
+ _,
+ filters,
+ logicalRelation @ LogicalRelation(
+ fsRelation @ HadoopFsRelation(
+ catalogFileIndex: CatalogFileIndex,
+ partitionSchema,
+ _,
+ _,
+ _,
+ _),
+ _,
+ _,
+ _)) if fsRelation.partitionSchemaOption.isDefined =>
+ val (partitionKeyFilters, _) =
+ getPartitionKeyFiltersAndDataFilters(
+ fsRelation.sparkSession,
+ logicalRelation,
+ partitionSchema,
+ filters,
+ logicalRelation.output)
+
+ val prunedPartitionSize =
+ catalogFileIndex.filterPartitions(
+ partitionKeyFilters.toSeq)
+ .partitionSpec()
+ .partitions
+ .size
+ if (prunedPartitionSize > maxScanPartitions) {
+ throw maxPartitionExceedError(
+ prunedPartitionSize,
+ maxScanPartitions,
+ logicalRelation.catalogTable,
+ catalogFileIndex.rootPaths,
+ fsRelation.partitionSchema)
+ }
+ case _ =>
+ }
+ }
+
+ def maxPartitionExceedError(
+ prunedPartitionSize: Int,
+ maxPartitionSize: Int,
+ tableMeta: Option[CatalogTable],
+ rootPaths: Seq[Path],
+ partitionSchema: StructType): Throwable = {
+ val truncatedPaths =
+ if (rootPaths.length > 5) {
+ rootPaths.slice(0, 5).mkString(",") + """... """ + (rootPaths.length -
5) + " more paths"
+ } else {
+ rootPaths.mkString(",")
+ }
+
+ new MaxPartitionExceedException(
+ s"""
+ |SQL job scan data source partition: $prunedPartitionSize
+ |exceed restrict of data source scan maxPartition $maxPartitionSize
+ |You should optimize your SQL logical according partition structure
+ |or shorten query scope such as p_date, detail as below:
+ |Table: ${tableMeta.map(_.qualifiedName).getOrElse("")}
+ |Owner: ${tableMeta.map(_.owner).getOrElse("")}
+ |RootPaths: $truncatedPaths
+ |Partition Structure: ${partitionSchema.map(_.name).mkString(", ")}
+ |""".stripMargin)
+ }
+}
diff --git
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/spark/sql/PruneFileSourcePartitionHelper.scala
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/spark/sql/PruneFileSourcePartitionHelper.scala
new file mode 100644
index 0000000..65dd016
--- /dev/null
+++
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/spark/sql/PruneFileSourcePartitionHelper.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
AttributeSet, Expression, ExpressionSet, PredicateHelper, SubqueryExpression}
+import org.apache.spark.sql.catalyst.plans.logical.LeafNode
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.types.StructType
+
+trait PruneFileSourcePartitionHelper extends PredicateHelper {
+
+ def getPartitionKeyFiltersAndDataFilters(
+ sparkSession: SparkSession,
+ relation: LeafNode,
+ partitionSchema: StructType,
+ filters: Seq[Expression],
+ output: Seq[AttributeReference]): (ExpressionSet, Seq[Expression]) = {
+ val normalizedFilters = DataSourceStrategy.normalizeExprs(
+ filters.filter(f => f.deterministic &&
!SubqueryExpression.hasSubquery(f)),
+ output)
+ val partitionColumns =
+ relation.resolve(partitionSchema,
sparkSession.sessionState.analyzer.resolver)
+ val partitionSet = AttributeSet(partitionColumns)
+ val (partitionFilters, dataFilters) = normalizedFilters.partition(f =>
+ f.references.subsetOf(partitionSet))
+ val extraPartitionFilter =
+ dataFilters.flatMap(extractPredicatesWithinOutputSet(_, partitionSet))
+
+ (ExpressionSet(partitionFilters ++ extraPartitionFilter), dataFilters)
+ }
+}
diff --git
a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/WatchDogSuite.scala
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/WatchDogSuite.scala
index 7fd9e48..642bd7f 100644
---
a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/WatchDogSuite.scala
+++
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/WatchDogSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql
import org.apache.spark.sql.catalyst.plans.logical.GlobalLimit
import org.apache.kyuubi.sql.KyuubiSQLConf
-import org.apache.kyuubi.sql.watchdog.MaxHivePartitionExceedException
+import org.apache.kyuubi.sql.watchdog.MaxPartitionExceedException
class WatchDogSuite extends KyuubiSparkSQLExtensionTest {
override protected def beforeAll(): Unit = {
@@ -31,38 +31,60 @@ class WatchDogSuite extends KyuubiSparkSQLExtensionTest {
case class LimitAndExpected(limit: Int, expected: Int)
val limitAndExpecteds = List(LimitAndExpected(1, 1), LimitAndExpected(11,
10))
- test("test watchdog with scan maxHivePartitions") {
- withTable("test", "temp") {
- sql(
- s"""
- |CREATE TABLE test(i int)
- |PARTITIONED BY (p int)
- |STORED AS textfile""".stripMargin)
- spark.range(0, 10, 1).selectExpr("id as col")
- .createOrReplaceTempView("temp")
-
- for (part <- Range(0, 10)) {
- sql(
- s"""
- |INSERT OVERWRITE TABLE test PARTITION (p='$part')
- |select col from temp""".stripMargin)
- }
+ private def checkMaxPartition: Unit = {
+ withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_PARTITIONS.key -> "100") {
+ checkAnswer(sql("SELECT count(distinct(p)) FROM test"), Row(10) :: Nil)
+ }
+ withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_PARTITIONS.key -> "5") {
+ sql("SELECT * FROM test where p=1").queryExecution.sparkPlan
- withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_HIVEPARTITION.key -> "5") {
+ sql(s"SELECT * FROM test WHERE p in (${Range(0,
5).toList.mkString(",")})")
+ .queryExecution.sparkPlan
- sql("SELECT * FROM test where p=1").queryExecution.sparkPlan
+ intercept[MaxPartitionExceedException](
+ sql("SELECT * FROM test where p != 1").queryExecution.sparkPlan)
- sql(
- s"SELECT * FROM test WHERE p in (${Range(0,
5).toList.mkString(",")})")
- .queryExecution.sparkPlan
+ intercept[MaxPartitionExceedException](
+ sql("SELECT * FROM test").queryExecution.sparkPlan)
- intercept[MaxHivePartitionExceedException](
- sql("SELECT * FROM test").queryExecution.sparkPlan)
+ intercept[MaxPartitionExceedException](sql(
+ s"SELECT * FROM test WHERE p in (${Range(0, 6).toList.mkString(",")})")
+ .queryExecution.sparkPlan)
+ }
+ }
- intercept[MaxHivePartitionExceedException](sql(
- s"SELECT * FROM test WHERE p in (${Range(0,
6).toList.mkString(",")})")
- .queryExecution.sparkPlan)
+ test("watchdog with scan maxPartitions -- hive") {
+ Seq("textfile", "parquet").foreach { format =>
+ withTable("test", "temp") {
+ sql(
+ s"""
+ |CREATE TABLE test(i int)
+ |PARTITIONED BY (p int)
+ |STORED AS $format""".stripMargin)
+ spark.range(0, 10, 1).selectExpr("id as col")
+ .createOrReplaceTempView("temp")
+
+ for (part <- Range(0, 10)) {
+ sql(
+ s"""
+ |INSERT OVERWRITE TABLE test PARTITION (p='$part')
+ |select col from temp""".stripMargin)
+ }
+ checkMaxPartition
+ }
+ }
+ }
+ test("watchdog with scan maxPartitions -- data source") {
+ withTempDir { dir =>
+ withTempView("test") {
+ spark.range(10).selectExpr("id", "id as p")
+ .write
+ .partitionBy("p")
+ .mode("overwrite")
+ .save(dir.getCanonicalPath)
+ spark.read.load(dir.getCanonicalPath).createOrReplaceTempView("test")
+ checkMaxPartition
}
}
}
diff --git
a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
index b70c039..cf63a6e 100644
---
a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
+++
b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
@@ -97,10 +97,12 @@ object KyuubiSQLConf {
.booleanConf
.createWithDefault(true)
- val WATCHDOG_MAX_HIVEPARTITION =
- buildConf("spark.sql.watchdog.maxHivePartitions")
- .doc("Add maxHivePartitions Strategy to avoid scan excessive " +
- "hive partitions on partitioned table, it's optional that works with
defined")
+ val WATCHDOG_MAX_PARTITIONS =
+ buildConf("spark.sql.watchdog.maxPartitions")
+ .doc("Set the max partition number when spark scans a data source. " +
+ "Enable MaxPartitionStrategy by specifying this configuration. " +
+ "Add maxPartitions Strategy to avoid scan excessive partitions " +
+ "on partitioned table, it's optional that works with defined")
.version("1.4.0")
.intConf
.createOptional
diff --git a/docs/sql/rules.md b/docs/sql/rules.md
index 052612c..c502d84 100644
--- a/docs/sql/rules.md
+++ b/docs/sql/rules.md
@@ -76,4 +76,4 @@ spark.sql.optimizer.finalStageConfigIsolation.enabled | false
| If true, the fin
spark.sql.analyzer.classification.enabled | false | When true, allows Kyuubi
engine to judge this SQL's classification and set
`spark.sql.analyzer.classification` back into sessionConf. Through this
configuration item, Spark can optimizing configuration dynamic. | 1.4.0
spark.sql.optimizer.insertZorderBeforeWriting.enabled | true | When true, we
will follow target table properties to insert zorder or not. The key properties
are: 1) `kyuubi.zorder.enabled`: if this property is true, we will insert
zorder before writing data. 2) `kyuubi.zorder.cols`: string split by comma, we
will zorder by these cols. | 1.4.0
spark.sql.optimizer.zorderGlobalSort.enabled | true | When true, we do a
global sort using zorder. Note that, it can cause data skew issue if the zorder
columns have less cardinality. When false, we only do local sort using zorder.
| 1.4.0
-spark.sql.watchdog.maxHivePartitions | none | Add maxHivePartitions Strategy
to avoid scan excessive hive partitions on partitioned table, it's optional
that works with defined. | 1.4.0
+spark.sql.watchdog.maxPartitions | none | Set the max partition number when
spark scans a data source. Enable MaxPartitionStrategy by specifying this
configuration. Add maxPartitions Strategy to avoid scan excessive partitions on
partitioned table, it's optional that works with defined | 1.4.0