This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b3398e695d92 [SPARK-46381][SQL] Migrate sub-classes of 
`AnalysisException` to error classes
b3398e695d92 is described below

commit b3398e695d929c7f867d408c28fb274509c9854c
Author: Max Gekk <max.g...@gmail.com>
AuthorDate: Wed Dec 13 12:28:31 2023 +0300

    [SPARK-46381][SQL] Migrate sub-classes of `AnalysisException` to error 
classes
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to migrate the rest two sub-classes of 
`AnalysisException` onto error classes:
    - NonEmptyNamespaceException
    - ExtendedAnalysisException
    
    and forbid raising of such exception without an error class.
    
    ### Why are the changes needed?
    This is a part of the migration on the error framework, and unifying errors 
in Spark.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, if user's code depends on the format of error messages.
    
    ### How was this patch tested?
    By existing test suites like:
    ```
    $ PYSPARK_PYTHON=python3 build/sbt "sql/testOnly 
org.apache.spark.sql.SQLQueryTestSuite"
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #44314 from MaxGekk/error-class-ExtendedAnalysisException.
    
    Authored-by: Max Gekk <max.g...@gmail.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../utils/src/main/resources/error/error-classes.json  | 15 +++++++++++++++
 .../sql/catalyst/analysis/NonEmptyException.scala      | 18 ++++++++++++------
 .../spark/sql/catalyst/ExtendedAnalysisException.scala |  2 +-
 .../sql/catalyst/analysis/ResolveTimeWindows.scala     |  7 +++++--
 .../analysis/UnsupportedOperationChecker.scala         |  5 ++++-
 .../spark/sql/errors/QueryCompilationErrors.scala      |  9 +++------
 .../scala/org/apache/spark/sql/jdbc/DB2Dialect.scala   |  2 +-
 .../org/apache/spark/sql/jdbc/MsSqlServerDialect.scala |  2 +-
 .../org/apache/spark/sql/jdbc/PostgresDialect.scala    |  2 +-
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala     |  8 ++++++--
 10 files changed, 49 insertions(+), 21 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index d52ffc011b72..2aa5420eb22c 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -6958,6 +6958,21 @@
       "<message>"
     ]
   },
+  "_LEGACY_ERROR_TEMP_3101" : {
+    "message" : [
+      "The input is not a correct window column: <windowTime>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3102" : {
+    "message" : [
+      "<msg>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3103" : {
+    "message" : [
+      "Namespace '<namespace>' is non empty. <details>"
+    ]
+  },
   "_LEGACY_ERROR_USER_RAISED_EXCEPTION" : {
     "message" : [
       "<errorMessage>"
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala
index 2aea9bac12fe..6475ac3093fe 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala
@@ -25,12 +25,18 @@ import 
org.apache.spark.sql.catalyst.util.QuotingUtils.quoted
  * Thrown by a catalog when an item already exists. The analyzer will rethrow 
the exception
  * as an [[org.apache.spark.sql.AnalysisException]] with the correct position 
information.
  */
-case class NonEmptyNamespaceException private(
-    override val message: String,
+case class NonEmptyNamespaceException(
+    namespace: Array[String],
+    details: String,
     override val cause: Option[Throwable] = None)
-  extends AnalysisException(message, cause = cause) {
+  extends AnalysisException(
+    errorClass = "_LEGACY_ERROR_TEMP_3103",
+    messageParameters = Map(
+      "namespace" -> quoted(namespace),
+      "details" -> details)) {
 
-  def this(namespace: Array[String]) = {
-    this(s"Namespace '${quoted(namespace)}' is non empty.")
-  }
+  def this(namespace: Array[String]) = this(namespace, "", None)
+
+  def this(details: String, cause: Option[Throwable]) =
+    this(Array.empty, details, cause)
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ExtendedAnalysisException.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ExtendedAnalysisException.scala
index 2eb7054edceb..1565935a8739 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ExtendedAnalysisException.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ExtendedAnalysisException.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 /**
  * Internal [[AnalysisException]] that also captures a [[LogicalPlan]].
  */
-class ExtendedAnalysisException(
+class ExtendedAnalysisException private(
     message: String,
     line: Option[Int] = None,
     startPosition: Option[Int] = None,
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
index 1ee218f9369c..a6688f276621 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.ExtendedAnalysisException
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, CaseWhen, Cast, CreateNamedStruct, Expression, 
GetStructField, IsNotNull, LessThan, Literal, PreciseTimestampConversion, 
SessionWindow, Subtract, TimeWindow, WindowTime}
 import org.apache.spark.sql.catalyst.plans.logical.{Expand, Filter, 
LogicalPlan, Project}
@@ -309,9 +310,11 @@ object ResolveWindowTime extends Rule[LogicalPlan] {
 
           if (!metadata.contains(TimeWindow.marker) &&
             !metadata.contains(SessionWindow.marker)) {
-            // FIXME: error framework?
             throw new ExtendedAnalysisException(
-              s"The input is not a correct window column: $windowTime", plan = 
Some(p))
+              new AnalysisException(
+                errorClass = "_LEGACY_ERROR_TEMP_3101",
+                messageParameters = Map("windowTime" -> windowTime.toString)),
+              plan = p)
           }
 
           val newMetadata = new MetadataBuilder()
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index 0394a697c12f..68f93a1f2113 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -552,7 +552,10 @@ object UnsupportedOperationChecker extends Logging {
 
   private def throwError(msg: String)(implicit operator: LogicalPlan): Nothing 
= {
     throw new ExtendedAnalysisException(
-      msg, operator.origin.line, operator.origin.startPosition, Some(operator))
+      new AnalysisException(
+        errorClass = "_LEGACY_ERROR_TEMP_3102",
+        messageParameters = Map("msg" -> msg)),
+      plan = operator)
   }
 
   private def checkForStreamStreamJoinWatermark(join: Join): Unit = {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 1195e9dd78da..c1ee9b49d8de 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable
 
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.{SPARK_DOC_ROOT, SparkException, SparkThrowable, 
SparkThrowableHelper, SparkUnsupportedOperationException}
+import org.apache.spark.{SPARK_DOC_ROOT, SparkException, SparkThrowable, 
SparkUnsupportedOperationException}
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{ExtendedAnalysisException, 
FunctionIdentifier, InternalRow, QualifiedTableName, TableIdentifier}
 import 
org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, 
FunctionAlreadyExistsException, NamespaceAlreadyExistsException, 
NoSuchFunctionException, NoSuchNamespaceException, NoSuchPartitionException, 
NoSuchTableException, ResolvedTable, Star, TableAlreadyExistsException, 
UnresolvedRegex}
@@ -1955,12 +1955,9 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
   }
 
   def streamJoinStreamWithoutEqualityPredicateUnsupportedError(plan: 
LogicalPlan): Throwable = {
-    val errorClass = "_LEGACY_ERROR_TEMP_1181"
     new ExtendedAnalysisException(
-      SparkThrowableHelper.getMessage(errorClass, Map.empty[String, String]),
-      errorClass = Some(errorClass),
-      messageParameters = Map.empty,
-      plan = Some(plan))
+      new AnalysisException(errorClass = "_LEGACY_ERROR_TEMP_1181", 
messageParameters = Map.empty),
+      plan = plan)
   }
 
   def invalidPandasUDFPlacementError(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
index 189dedb60f0c..8975a015ee8e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
@@ -149,7 +149,7 @@ private object DB2Dialect extends JdbcDialect {
       case sqlException: SQLException =>
         sqlException.getSQLState match {
           // https://www.ibm.com/docs/en/db2/11.5?topic=messages-sqlstate
-          case "42893" => throw NonEmptyNamespaceException(message, cause = 
Some(e))
+          case "42893" => throw new NonEmptyNamespaceException(message, cause 
= Some(e))
           case _ => super.classifyException(message, e)
         }
       case _ => super.classifyException(message, e)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
index 361da645ee4c..ee649122ca80 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
@@ -194,7 +194,7 @@ private object MsSqlServerDialect extends JdbcDialect {
     e match {
       case sqlException: SQLException =>
         sqlException.getErrorCode match {
-          case 3729 => throw NonEmptyNamespaceException(message, cause = 
Some(e))
+          case 3729 => throw new NonEmptyNamespaceException(message, cause = 
Some(e))
           case _ => super.classifyException(message, e)
         }
       case _ => super.classifyException(message, e)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
index e3af80743272..cff7bb5e06f0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
@@ -254,7 +254,7 @@ private object PostgresDialect extends JdbcDialect with 
SQLConfHelper {
             val indexName = regex.findFirstMatchIn(message).get.group(1)
             val tableName = regex.findFirstMatchIn(message).get.group(2)
             throw new NoSuchIndexException(indexName, tableName, cause = 
Some(e))
-          case "2BP01" => throw NonEmptyNamespaceException(message, cause = 
Some(e))
+          case "2BP01" => throw new NonEmptyNamespaceException(message, cause 
= Some(e))
           case _ => super.classifyException(message, e)
         }
       case unsupported: UnsupportedOperationException => throw unsupported
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 15cbd69e62e0..b603c95fb30d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2638,10 +2638,14 @@ class SQLQuerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
 
   test("SPARK-20164: ExtendedAnalysisException should be tolerant to null 
query plan") {
     try {
-      throw new ExtendedAnalysisException("", None, None, plan = null)
+      throw new ExtendedAnalysisException(
+        new AnalysisException(
+          errorClass = "_LEGACY_ERROR_USER_RAISED_EXCEPTION",
+          messageParameters = Map("errorMessage" -> "null query plan")),
+        plan = null)
     } catch {
       case ae: ExtendedAnalysisException =>
-        assert(ae.plan == null && ae.getMessage == ae.getSimpleMessage)
+        assert(ae.plan == None && ae.getMessage == ae.getSimpleMessage)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to