This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new fd6d1b4  [SPARK-30326][SQL] Raise exception if analyzer exceed max 
iterations
fd6d1b4 is described below

commit fd6d1b400630d7fee6d031e6de1fccfb4993778b
Author: Eric Wu <[email protected]>
AuthorDate: Mon Feb 10 23:41:39 2020 +0800

    [SPARK-30326][SQL] Raise exception if analyzer exceed max iterations
    
    ### What changes were proposed in this pull request?
    Enhance RuleExecutor strategy to take different actions when exceeding max 
iterations. And raise exception if analyzer exceed max iterations.
    
    ### Why are the changes needed?
    Currently, both analyzer and optimizer just log warning message if rule 
execution exceed max iterations. They should have different behavior. Analyzer 
should raise exception to indicates the plan is not fixed after max iterations, 
while optimizer just log warning to keep the current plan. This is more 
feasible after SPARK-30138 was introduced.
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    Add test in AnalysisSuite
    
    Closes #26977 from Eric5553/EnhanceMaxIterations.
    
    Authored-by: Eric Wu <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit b2011a295bd78b3693a516e049e90250366b8f52)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 10 +++++++-
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  5 +++-
 .../spark/sql/catalyst/rules/RuleExecutor.scala    | 27 ++++++++++++++++++----
 .../sql/catalyst/analysis/AnalysisSuite.scala      | 25 +++++++++++++++++++-
 4 files changed, 60 insertions(+), 7 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 75f1aa7..ce82b3b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -176,7 +176,15 @@ class Analyzer(
 
   def resolver: Resolver = conf.resolver
 
-  protected val fixedPoint = FixedPoint(maxIterations)
+  /**
+   * If the plan cannot be resolved within maxIterations, analyzer will throw 
exception to inform
+   * user to increase the value of SQLConf.ANALYZER_MAX_ITERATIONS.
+   */
+  protected val fixedPoint =
+    FixedPoint(
+      maxIterations,
+      errorOnExceed = true,
+      maxIterationsSetting = SQLConf.ANALYZER_MAX_ITERATIONS.key)
 
   /**
    * Override to provide additional rules for the "Resolution" batch.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 0fdf6b0..c90117b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -53,7 +53,10 @@ abstract class Optimizer(catalogManager: CatalogManager)
       "PartitionPruning",
       "Extract Python UDFs")
 
-  protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations)
+  protected def fixedPoint =
+    FixedPoint(
+      SQLConf.get.optimizerMaxIterations,
+      maxIterationsSetting = SQLConf.OPTIMIZER_MAX_ITERATIONS.key)
 
   /**
    * Defines the default rule batches in the Optimizer.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
index 287ae0e..da5242b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
@@ -45,7 +45,17 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends 
Logging {
    * An execution strategy for rules that indicates the maximum number of 
executions. If the
    * execution reaches fix point (i.e. converge) before maxIterations, it will 
stop.
    */
-  abstract class Strategy { def maxIterations: Int }
+  abstract class Strategy {
+
+    /** The maximum number of executions. */
+    def maxIterations: Int
+
+    /** Whether to throw exception when exceeding the maximum number. */
+    def errorOnExceed: Boolean = false
+
+    /** The key of SQLConf setting to tune maxIterations */
+    def maxIterationsSetting: String = null
+  }
 
   /** A strategy that is run once and idempotent. */
   case object Once extends Strategy { val maxIterations = 1 }
@@ -54,7 +64,10 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends 
Logging {
    * A strategy that runs until fix point or maxIterations times, whichever 
comes first.
    * Especially, a FixedPoint(1) batch is supposed to run only once.
    */
-  case class FixedPoint(maxIterations: Int) extends Strategy
+  case class FixedPoint(
+    override val maxIterations: Int,
+    override val errorOnExceed: Boolean = false,
+    override val maxIterationsSetting: String = null) extends Strategy
 
   /** A batch of rules. */
   protected case class Batch(name: String, strategy: Strategy, rules: 
Rule[TreeType]*)
@@ -155,8 +168,14 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] 
extends Logging {
         if (iteration > batch.strategy.maxIterations) {
           // Only log if this is a rule that is supposed to run more than once.
           if (iteration != 2) {
-            val message = s"Max iterations (${iteration - 1}) reached for 
batch ${batch.name}"
-            if (Utils.isTesting) {
+            val endingMsg = if (batch.strategy.maxIterationsSetting == null) {
+              "."
+            } else {
+              s", please set '${batch.strategy.maxIterationsSetting}' to a 
larger value."
+            }
+            val message = s"Max iterations (${iteration - 1}) reached for 
batch ${batch.name}" +
+              s"$endingMsg"
+            if (Utils.isTesting || batch.strategy.errorOnExceed) {
               throw new TreeNodeException(curPlan, message, null)
             } else {
               logWarning(message)
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index c747d39..d385133 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -25,9 +25,10 @@ import org.scalatest.Matchers
 
 import org.apache.spark.api.python.PythonEvalType
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType, InMemoryCatalog, SessionCatalog}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
Count, Sum}
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan
@@ -745,4 +746,26 @@ class AnalysisSuite extends AnalysisTest with Matchers {
       CollectMetrics("evt1", sumWithFilter :: Nil, testRelation),
       "aggregates with filter predicate are not allowed" :: Nil)
   }
+
+  test("Analysis exceed max iterations") {
+    // RuleExecutor only throw exception or log warning when the rule is 
supposed to run
+    // more than once.
+    val maxIterations = 2
+    val conf = new SQLConf().copy(SQLConf.ANALYZER_MAX_ITERATIONS -> 
maxIterations)
+    val testAnalyzer = new Analyzer(
+      new SessionCatalog(new InMemoryCatalog, FunctionRegistry.builtin, conf), 
conf)
+
+    val plan = testRelation2.select(
+      $"a" / Literal(2) as "div1",
+      $"a" / $"b" as "div2",
+      $"a" / $"c" as "div3",
+      $"a" / $"d" as "div4",
+      $"e" / $"e" as "div5")
+
+    val message = intercept[TreeNodeException[LogicalPlan]] {
+      testAnalyzer.execute(plan)
+    }.getMessage
+    assert(message.startsWith(s"Max iterations ($maxIterations) reached for 
batch Resolution, " +
+      s"please set '${SQLConf.ANALYZER_MAX_ITERATIONS.key}' to a larger 
value."))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to