This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 78c9cf3  [KYUUBI #1443] MaxPartitionStrategy support file source based 
table.
78c9cf3 is described below

commit 78c9cf3756e406271b032a5ae18dd55631801754
Author: Fu Chen <[email protected]>
AuthorDate: Thu Nov 25 17:34:17 2021 +0800

    [KYUUBI #1443] MaxPartitionStrategy support file source based table.
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: 
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in 
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your 
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., 
'[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    
    Fix 
[comment](https://github.com/apache/incubator-kyuubi/pull/1086#discussion_r707103617)
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #1443 from cfmcgrady/max-partition-strategy.
    
    Closes #1443
    
    e071ebc2 [Fu Chen] typo
    f7d4ea16 [Fu Chen] refactor
    97a36016 [Fu Chen] truncate RootPaths
    c3fe93aa [Fu Chen] remove dpp suite
    9075e5ba [Fu Chen] refactor
    5a71f459 [Fu Chen] trigger GitHub actions
    17df25d2 [Fu Chen] fix style
    4dfe6a7e [Fu Chen] fix style
    0b2df063 [Fu Chen] update docs
    305c6e88 [Fu Chen] fix style
    33b9b3d5 [Fu Chen] fix style
    885e8ed7 [Fu Chen] data source support
    
    Authored-by: Fu Chen <[email protected]>
    Signed-off-by: ulysses-you <[email protected]>
---
 .../kyuubi/sql/KyuubiSparkSQLExtension.scala       |   4 +-
 .../sql/watchdog/KyuubiWatchDogException.scala     |   2 +-
 .../sql/watchdog/MaxHivePartitionStrategy.scala    |  81 ---------
 .../kyuubi/sql/watchdog/MaxPartitionStrategy.scala | 185 +++++++++++++++++++++
 .../spark/sql/PruneFileSourcePartitionHelper.scala |  46 +++++
 .../scala/org/apache/spark/sql/WatchDogSuite.scala |  76 ++++++---
 .../org/apache/kyuubi/sql/KyuubiSQLConf.scala      |  10 +-
 docs/sql/rules.md                                  |   2 +-
 8 files changed, 290 insertions(+), 116 deletions(-)

diff --git 
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
 
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
index f5a2ade..03e736b 100644
--- 
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
+++ 
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
@@ -20,7 +20,7 @@ package org.apache.kyuubi.sql
 import org.apache.spark.sql.SparkSessionExtensions
 
 import org.apache.kyuubi.sql.sqlclassification.KyuubiSqlClassification
-import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, 
MarkAggregateOrderRule, MaxHivePartitionStrategy}
+import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, 
MarkAggregateOrderRule, MaxPartitionStrategy}
 
 // scalastyle:off line.size.limit
 /**
@@ -40,6 +40,6 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions 
=> Unit) {
     extensions.injectPostHocResolutionRule(RepartitionBeforeWritingHive)
     extensions.injectPostHocResolutionRule(ForcedMaxOutputRowsRule)
 
-    extensions.injectPlannerStrategy(MaxHivePartitionStrategy)
+    extensions.injectPlannerStrategy(MaxPartitionStrategy)
   }
 }
diff --git 
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/KyuubiWatchDogException.scala
 
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/KyuubiWatchDogException.scala
index e84007d..b3c58af 100644
--- 
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/KyuubiWatchDogException.scala
+++ 
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/KyuubiWatchDogException.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.sql.watchdog
 
 import org.apache.kyuubi.sql.KyuubiSQLExtensionException
 
-final class MaxHivePartitionExceedException(
+final class MaxPartitionExceedException(
     private val reason: String = "",
     private val cause: Throwable = None.orNull)
   extends KyuubiSQLExtensionException(reason, cause)
diff --git 
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxHivePartitionStrategy.scala
 
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxHivePartitionStrategy.scala
deleted file mode 100644
index 7f71584..0000000
--- 
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxHivePartitionStrategy.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.sql.watchdog
-
-import org.apache.spark.sql.{SparkSession, Strategy}
-import org.apache.spark.sql.catalyst.SQLConfHelper
-import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
-import org.apache.spark.sql.catalyst.planning.ScanOperation
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkPlan
-
-import org.apache.kyuubi.sql.KyuubiSQLConf
-
-/**
- * Add maxHivePartitions Strategy to avoid scan excessive hive partitions on 
partitioned table
- * 1 Check if scan exceed maxHivePartition
- * 2 Check if Using partitionFilter on partitioned table
- * This Strategy Add Planner Strategy after LogicalOptimizer
- */
-case class MaxHivePartitionStrategy(session: SparkSession)
-  extends Strategy with SQLConfHelper {
-  override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
-    conf.getConf(KyuubiSQLConf.WATCHDOG_MAX_HIVEPARTITION) match {
-      case Some(maxHivePartition) => plan match {
-          case ScanOperation(_, _, relation: HiveTableRelation) if 
relation.isPartitioned =>
-            relation.prunedPartitions match {
-              case Some(prunedPartitions) =>
-                if (prunedPartitions.size > maxHivePartition) {
-                  throw new MaxHivePartitionExceedException(
-                    s"""
-                       |SQL job scan hive partition: ${prunedPartitions.size}
-                       |exceed restrict of hive scan maxPartition 
$maxHivePartition
-                       |You should optimize your SQL logical according 
partition structure
-                       |or shorten query scope such as p_date, detail as below:
-                       |Table: ${relation.tableMeta.qualifiedName}
-                       |Owner: ${relation.tableMeta.owner}
-                       |Partition Structure: 
${relation.partitionCols.map(_.name).mkString(" -> ")}
-                       |""".stripMargin)
-                } else {
-                  Nil
-                }
-              case _ =>
-                val totalPartitions = session
-                  .sessionState.catalog.externalCatalog.listPartitionNames(
-                    relation.tableMeta.database,
-                    relation.tableMeta.identifier.table)
-                if (totalPartitions.size > maxHivePartition) {
-                  throw new MaxHivePartitionExceedException(
-                    s"""
-                       |Your SQL job scan a whole huge table without any 
partition filter,
-                       |You should optimize your SQL logical according 
partition structure
-                       |or shorten query scope such as p_date, detail as below:
-                       |Table: ${relation.tableMeta.qualifiedName}
-                       |Owner: ${relation.tableMeta.owner}
-                       |Partition Structure: 
${relation.partitionCols.map(_.name).mkString(" -> ")}
-                       |""".stripMargin)
-                } else {
-                  Nil
-                }
-            }
-          case _ => Nil
-        }
-      case _ => Nil
-    }
-  }
-}
diff --git 
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxPartitionStrategy.scala
 
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxPartitionStrategy.scala
new file mode 100644
index 0000000..d724c7e
--- /dev/null
+++ 
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxPartitionStrategy.scala
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.watchdog
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.{PruneFileSourcePartitionHelper, SparkSession, 
Strategy}
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation}
+import org.apache.spark.sql.catalyst.planning.ScanOperation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, 
HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
+import org.apache.spark.sql.types.StructType
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+
+/**
+ * Add maxPartitions Strategy to avoid scan excessive partitions on 
partitioned table
+ * 1 Check if scan exceed maxPartition
+ * 2 Check if Using partitionFilter on partitioned table
+ * This Strategy Add Planner Strategy after LogicalOptimizer
+ */
+case class MaxPartitionStrategy(session: SparkSession)
+  extends Strategy
+  with SQLConfHelper
+  with PruneFileSourcePartitionHelper {
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
+    val maxScanPartitionsOpt = 
conf.getConf(KyuubiSQLConf.WATCHDOG_MAX_PARTITIONS)
+
+    if (maxScanPartitionsOpt.isDefined) {
+      checkRelationMaxPartitions(plan, maxScanPartitionsOpt.get)
+    }
+    Nil
+  }
+
+  private def checkRelationMaxPartitions(
+      plan: LogicalPlan,
+      maxScanPartitions: Int): Unit = {
+    plan match {
+      case ScanOperation(_, _, relation: HiveTableRelation) if 
relation.isPartitioned =>
+        relation.prunedPartitions match {
+          case Some(prunedPartitions) =>
+            if (prunedPartitions.size > maxScanPartitions) {
+              throw new MaxPartitionExceedException(
+                s"""
+                   |SQL job scan hive partition: ${prunedPartitions.size}
+                   |exceed restrict of hive scan maxPartition 
$maxScanPartitions
+                   |You should optimize your SQL logical according partition 
structure
+                   |or shorten query scope such as p_date, detail as below:
+                   |Table: ${relation.tableMeta.qualifiedName}
+                   |Owner: ${relation.tableMeta.owner}
+                   |Partition Structure: 
${relation.partitionCols.map(_.name).mkString(", ")}
+                   |""".stripMargin)
+            }
+          case _ =>
+            val totalPartitions = session
+              .sessionState.catalog.externalCatalog.listPartitionNames(
+                relation.tableMeta.database,
+                relation.tableMeta.identifier.table)
+            if (totalPartitions.size > maxScanPartitions) {
+              throw new MaxPartitionExceedException(
+                s"""
+                   |Your SQL job scan a whole huge table without any partition 
filter,
+                   |You should optimize your SQL logical according partition 
structure
+                   |or shorten query scope such as p_date, detail as below:
+                   |Table: ${relation.tableMeta.qualifiedName}
+                   |Owner: ${relation.tableMeta.owner}
+                   |Partition Structure: 
${relation.partitionCols.map(_.name).mkString(", ")}
+                   |""".stripMargin)
+            }
+        }
+      case ScanOperation(
+            _,
+            filters,
+            relation @ LogicalRelation(
+              fsRelation @ HadoopFsRelation(
+                fileIndex: InMemoryFileIndex,
+                partitionSchema,
+                _,
+                _,
+                _,
+                _),
+              _,
+              _,
+              _)) if fsRelation.partitionSchemaOption.isDefined =>
+        val (partitionKeyFilters, dataFilter) =
+          getPartitionKeyFiltersAndDataFilters(
+            fsRelation.sparkSession,
+            relation,
+            partitionSchema,
+            filters,
+            relation.output)
+        val prunedPartitionSize = fileIndex.listFiles(
+          partitionKeyFilters.toSeq,
+          dataFilter)
+          .size
+        if (prunedPartitionSize > maxScanPartitions) {
+          throw maxPartitionExceedError(
+            prunedPartitionSize,
+            maxScanPartitions,
+            relation.catalogTable,
+            fileIndex.rootPaths,
+            fsRelation.partitionSchema)
+        }
+      case ScanOperation(
+            _,
+            filters,
+            logicalRelation @ LogicalRelation(
+              fsRelation @ HadoopFsRelation(
+                catalogFileIndex: CatalogFileIndex,
+                partitionSchema,
+                _,
+                _,
+                _,
+                _),
+              _,
+              _,
+              _)) if fsRelation.partitionSchemaOption.isDefined =>
+        val (partitionKeyFilters, _) =
+          getPartitionKeyFiltersAndDataFilters(
+            fsRelation.sparkSession,
+            logicalRelation,
+            partitionSchema,
+            filters,
+            logicalRelation.output)
+
+        val prunedPartitionSize =
+          catalogFileIndex.filterPartitions(
+            partitionKeyFilters.toSeq)
+            .partitionSpec()
+            .partitions
+            .size
+        if (prunedPartitionSize > maxScanPartitions) {
+          throw maxPartitionExceedError(
+            prunedPartitionSize,
+            maxScanPartitions,
+            logicalRelation.catalogTable,
+            catalogFileIndex.rootPaths,
+            fsRelation.partitionSchema)
+        }
+      case _ =>
+    }
+  }
+
+  def maxPartitionExceedError(
+      prunedPartitionSize: Int,
+      maxPartitionSize: Int,
+      tableMeta: Option[CatalogTable],
+      rootPaths: Seq[Path],
+      partitionSchema: StructType): Throwable = {
+    val truncatedPaths =
+      if (rootPaths.length > 5) {
+        rootPaths.slice(0, 5).mkString(",") + """... """ + (rootPaths.length - 
5) + " more paths"
+      } else {
+        rootPaths.mkString(",")
+      }
+
+    new MaxPartitionExceedException(
+      s"""
+         |SQL job scan data source partition: $prunedPartitionSize
+         |exceed restrict of data source scan maxPartition $maxPartitionSize
+         |You should optimize your SQL logical according partition structure
+         |or shorten query scope such as p_date, detail as below:
+         |Table: ${tableMeta.map(_.qualifiedName).getOrElse("")}
+         |Owner: ${tableMeta.map(_.owner).getOrElse("")}
+         |RootPaths: $truncatedPaths
+         |Partition Structure: ${partitionSchema.map(_.name).mkString(", ")}
+         |""".stripMargin)
+  }
+}
diff --git 
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/spark/sql/PruneFileSourcePartitionHelper.scala
 
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/spark/sql/PruneFileSourcePartitionHelper.scala
new file mode 100644
index 0000000..65dd016
--- /dev/null
+++ 
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/spark/sql/PruneFileSourcePartitionHelper.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
AttributeSet, Expression, ExpressionSet, PredicateHelper, SubqueryExpression}
+import org.apache.spark.sql.catalyst.plans.logical.LeafNode
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.types.StructType
+
+trait PruneFileSourcePartitionHelper extends PredicateHelper {
+
+  def getPartitionKeyFiltersAndDataFilters(
+      sparkSession: SparkSession,
+      relation: LeafNode,
+      partitionSchema: StructType,
+      filters: Seq[Expression],
+      output: Seq[AttributeReference]): (ExpressionSet, Seq[Expression]) = {
+    val normalizedFilters = DataSourceStrategy.normalizeExprs(
+      filters.filter(f => f.deterministic && 
!SubqueryExpression.hasSubquery(f)),
+      output)
+    val partitionColumns =
+      relation.resolve(partitionSchema, 
sparkSession.sessionState.analyzer.resolver)
+    val partitionSet = AttributeSet(partitionColumns)
+    val (partitionFilters, dataFilters) = normalizedFilters.partition(f =>
+      f.references.subsetOf(partitionSet))
+    val extraPartitionFilter =
+      dataFilters.flatMap(extractPredicatesWithinOutputSet(_, partitionSet))
+
+    (ExpressionSet(partitionFilters ++ extraPartitionFilter), dataFilters)
+  }
+}
diff --git 
a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/WatchDogSuite.scala
 
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/WatchDogSuite.scala
index 7fd9e48..642bd7f 100644
--- 
a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/WatchDogSuite.scala
+++ 
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/WatchDogSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql
 import org.apache.spark.sql.catalyst.plans.logical.GlobalLimit
 
 import org.apache.kyuubi.sql.KyuubiSQLConf
-import org.apache.kyuubi.sql.watchdog.MaxHivePartitionExceedException
+import org.apache.kyuubi.sql.watchdog.MaxPartitionExceedException
 
 class WatchDogSuite extends KyuubiSparkSQLExtensionTest {
   override protected def beforeAll(): Unit = {
@@ -31,38 +31,60 @@ class WatchDogSuite extends KyuubiSparkSQLExtensionTest {
   case class LimitAndExpected(limit: Int, expected: Int)
   val limitAndExpecteds = List(LimitAndExpected(1, 1), LimitAndExpected(11, 
10))
 
-  test("test watchdog with scan maxHivePartitions") {
-    withTable("test", "temp") {
-      sql(
-        s"""
-           |CREATE TABLE test(i int)
-           |PARTITIONED BY (p int)
-           |STORED AS textfile""".stripMargin)
-      spark.range(0, 10, 1).selectExpr("id as col")
-        .createOrReplaceTempView("temp")
-
-      for (part <- Range(0, 10)) {
-        sql(
-          s"""
-             |INSERT OVERWRITE TABLE test PARTITION (p='$part')
-             |select col from temp""".stripMargin)
-      }
+  private def checkMaxPartition: Unit = {
+    withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_PARTITIONS.key -> "100") {
+      checkAnswer(sql("SELECT count(distinct(p)) FROM test"), Row(10) :: Nil)
+    }
+    withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_PARTITIONS.key -> "5") {
+      sql("SELECT * FROM test where p=1").queryExecution.sparkPlan
 
-      withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_HIVEPARTITION.key -> "5") {
+      sql(s"SELECT * FROM test WHERE p in (${Range(0, 
5).toList.mkString(",")})")
+        .queryExecution.sparkPlan
 
-        sql("SELECT * FROM test where p=1").queryExecution.sparkPlan
+      intercept[MaxPartitionExceedException](
+        sql("SELECT * FROM test where p != 1").queryExecution.sparkPlan)
 
-        sql(
-          s"SELECT * FROM test WHERE p in (${Range(0, 
5).toList.mkString(",")})")
-          .queryExecution.sparkPlan
+      intercept[MaxPartitionExceedException](
+        sql("SELECT * FROM test").queryExecution.sparkPlan)
 
-        intercept[MaxHivePartitionExceedException](
-          sql("SELECT * FROM test").queryExecution.sparkPlan)
+      intercept[MaxPartitionExceedException](sql(
+        s"SELECT * FROM test WHERE p in (${Range(0, 6).toList.mkString(",")})")
+        .queryExecution.sparkPlan)
+    }
+  }
 
-        intercept[MaxHivePartitionExceedException](sql(
-          s"SELECT * FROM test WHERE p in (${Range(0, 
6).toList.mkString(",")})")
-          .queryExecution.sparkPlan)
+  test("watchdog with scan maxPartitions -- hive") {
+    Seq("textfile", "parquet").foreach { format =>
+      withTable("test", "temp") {
+        sql(
+          s"""
+             |CREATE TABLE test(i int)
+             |PARTITIONED BY (p int)
+             |STORED AS $format""".stripMargin)
+        spark.range(0, 10, 1).selectExpr("id as col")
+          .createOrReplaceTempView("temp")
+
+        for (part <- Range(0, 10)) {
+          sql(
+            s"""
+               |INSERT OVERWRITE TABLE test PARTITION (p='$part')
+               |select col from temp""".stripMargin)
+        }
+        checkMaxPartition
+      }
+    }
+  }
 
+  test("watchdog with scan maxPartitions -- data source") {
+    withTempDir { dir =>
+      withTempView("test") {
+        spark.range(10).selectExpr("id", "id as p")
+          .write
+          .partitionBy("p")
+          .mode("overwrite")
+          .save(dir.getCanonicalPath)
+        spark.read.load(dir.getCanonicalPath).createOrReplaceTempView("test")
+        checkMaxPartition
       }
     }
   }
diff --git 
a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
 
b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
index b70c039..cf63a6e 100644
--- 
a/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
+++ 
b/dev/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
@@ -97,10 +97,12 @@ object KyuubiSQLConf {
       .booleanConf
       .createWithDefault(true)
 
-  val WATCHDOG_MAX_HIVEPARTITION =
-    buildConf("spark.sql.watchdog.maxHivePartitions")
-      .doc("Add maxHivePartitions Strategy to avoid scan excessive " +
-        "hive partitions on partitioned table, it's optional that works with 
defined")
+  val WATCHDOG_MAX_PARTITIONS =
+    buildConf("spark.sql.watchdog.maxPartitions")
+      .doc("Set the max partition number when spark scans a data source. " +
+        "Enable MaxPartitionStrategy by specifying this configuration. " +
+        "Add maxPartitions Strategy to avoid scan excessive partitions " +
+        "on partitioned table, it's optional that works with defined")
       .version("1.4.0")
       .intConf
       .createOptional
diff --git a/docs/sql/rules.md b/docs/sql/rules.md
index 052612c..c502d84 100644
--- a/docs/sql/rules.md
+++ b/docs/sql/rules.md
@@ -76,4 +76,4 @@ spark.sql.optimizer.finalStageConfigIsolation.enabled | false 
| If true, the fin
 spark.sql.analyzer.classification.enabled | false | When true, allows Kyuubi 
engine to judge this SQL's classification and set 
`spark.sql.analyzer.classification` back into sessionConf. Through this 
configuration item, Spark can optimizing configuration dynamic. | 1.4.0
 spark.sql.optimizer.insertZorderBeforeWriting.enabled | true | When true, we 
will follow target table properties to insert zorder or not. The key properties 
are: 1) `kyuubi.zorder.enabled`: if this property is true, we will insert 
zorder before writing data. 2) `kyuubi.zorder.cols`: string split by comma, we 
will zorder by these cols. | 1.4.0
 spark.sql.optimizer.zorderGlobalSort.enabled | true | When true, we do a 
global sort using zorder. Note that, it can cause data skew issue if the zorder 
columns have less cardinality. When false, we only do local sort using zorder. 
| 1.4.0
-spark.sql.watchdog.maxHivePartitions | none | Add maxHivePartitions Strategy 
to avoid scan excessive hive partitions on partitioned table, it's optional 
that works with defined. | 1.4.0
+spark.sql.watchdog.maxPartitions | none | Set the max partition number when 
spark scans a data source. Enable MaxPartitionStrategy by specifying this 
configuration. Add maxPartitions Strategy to avoid scan excessive partitions on 
partitioned table, it's optional that works with defined | 1.4.0

Reply via email to