This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b3398e695d92 [SPARK-46381][SQL] Migrate sub-classes of
`AnalysisException` to error classes
b3398e695d92 is described below
commit b3398e695d929c7f867d408c28fb274509c9854c
Author: Max Gekk <[email protected]>
AuthorDate: Wed Dec 13 12:28:31 2023 +0300
[SPARK-46381][SQL] Migrate sub-classes of `AnalysisException` to error
classes
### What changes were proposed in this pull request?
In the PR, I propose to migrate the rest two sub-classes of
`AnalysisException` onto error classes:
- NonEmptyNamespaceException
- ExtendedAnalysisException
and forbid raising of such exception without an error class.
### Why are the changes needed?
This is a part of the migration on the error framework, and unifying errors
in Spark.
### Does this PR introduce _any_ user-facing change?
Yes, if user's code depends on the format of error messages.
### How was this patch tested?
By existing test suites like:
```
$ PYSPARK_PYTHON=python3 build/sbt "sql/testOnly
org.apache.spark.sql.SQLQueryTestSuite"
```
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44314 from MaxGekk/error-class-ExtendedAnalysisException.
Authored-by: Max Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
---
.../utils/src/main/resources/error/error-classes.json | 15 +++++++++++++++
.../sql/catalyst/analysis/NonEmptyException.scala | 18 ++++++++++++------
.../spark/sql/catalyst/ExtendedAnalysisException.scala | 2 +-
.../sql/catalyst/analysis/ResolveTimeWindows.scala | 7 +++++--
.../analysis/UnsupportedOperationChecker.scala | 5 ++++-
.../spark/sql/errors/QueryCompilationErrors.scala | 9 +++------
.../scala/org/apache/spark/sql/jdbc/DB2Dialect.scala | 2 +-
.../org/apache/spark/sql/jdbc/MsSqlServerDialect.scala | 2 +-
.../org/apache/spark/sql/jdbc/PostgresDialect.scala | 2 +-
.../scala/org/apache/spark/sql/SQLQuerySuite.scala | 8 ++++++--
10 files changed, 49 insertions(+), 21 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-classes.json
b/common/utils/src/main/resources/error/error-classes.json
index d52ffc011b72..2aa5420eb22c 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -6958,6 +6958,21 @@
"<message>"
]
},
+ "_LEGACY_ERROR_TEMP_3101" : {
+ "message" : [
+ "The input is not a correct window column: <windowTime>"
+ ]
+ },
+ "_LEGACY_ERROR_TEMP_3102" : {
+ "message" : [
+ "<msg>"
+ ]
+ },
+ "_LEGACY_ERROR_TEMP_3103" : {
+ "message" : [
+ "Namespace '<namespace>' is non empty. <details>"
+ ]
+ },
"_LEGACY_ERROR_USER_RAISED_EXCEPTION" : {
"message" : [
"<errorMessage>"
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala
index 2aea9bac12fe..6475ac3093fe 100644
---
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala
@@ -25,12 +25,18 @@ import
org.apache.spark.sql.catalyst.util.QuotingUtils.quoted
* Thrown by a catalog when an item already exists. The analyzer will rethrow
the exception
* as an [[org.apache.spark.sql.AnalysisException]] with the correct position
information.
*/
-case class NonEmptyNamespaceException private(
- override val message: String,
+case class NonEmptyNamespaceException(
+ namespace: Array[String],
+ details: String,
override val cause: Option[Throwable] = None)
- extends AnalysisException(message, cause = cause) {
+ extends AnalysisException(
+ errorClass = "_LEGACY_ERROR_TEMP_3103",
+ messageParameters = Map(
+ "namespace" -> quoted(namespace),
+ "details" -> details)) {
- def this(namespace: Array[String]) = {
- this(s"Namespace '${quoted(namespace)}' is non empty.")
- }
+ def this(namespace: Array[String]) = this(namespace, "", None)
+
+ def this(details: String, cause: Option[Throwable]) =
+ this(Array.empty, details, cause)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ExtendedAnalysisException.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ExtendedAnalysisException.scala
index 2eb7054edceb..1565935a8739 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ExtendedAnalysisException.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ExtendedAnalysisException.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
/**
* Internal [[AnalysisException]] that also captures a [[LogicalPlan]].
*/
-class ExtendedAnalysisException(
+class ExtendedAnalysisException private(
message: String,
line: Option[Int] = None,
startPosition: Option[Int] = None,
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
index 1ee218f9369c..a6688f276621 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.analysis
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.ExtendedAnalysisException
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeReference, CaseWhen, Cast, CreateNamedStruct, Expression,
GetStructField, IsNotNull, LessThan, Literal, PreciseTimestampConversion,
SessionWindow, Subtract, TimeWindow, WindowTime}
import org.apache.spark.sql.catalyst.plans.logical.{Expand, Filter,
LogicalPlan, Project}
@@ -309,9 +310,11 @@ object ResolveWindowTime extends Rule[LogicalPlan] {
if (!metadata.contains(TimeWindow.marker) &&
!metadata.contains(SessionWindow.marker)) {
- // FIXME: error framework?
throw new ExtendedAnalysisException(
- s"The input is not a correct window column: $windowTime", plan =
Some(p))
+ new AnalysisException(
+ errorClass = "_LEGACY_ERROR_TEMP_3101",
+ messageParameters = Map("windowTime" -> windowTime.toString)),
+ plan = p)
}
val newMetadata = new MetadataBuilder()
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index 0394a697c12f..68f93a1f2113 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -552,7 +552,10 @@ object UnsupportedOperationChecker extends Logging {
private def throwError(msg: String)(implicit operator: LogicalPlan): Nothing
= {
throw new ExtendedAnalysisException(
- msg, operator.origin.line, operator.origin.startPosition, Some(operator))
+ new AnalysisException(
+ errorClass = "_LEGACY_ERROR_TEMP_3102",
+ messageParameters = Map("msg" -> msg)),
+ plan = operator)
}
private def checkForStreamStreamJoinWatermark(join: Join): Unit = {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 1195e9dd78da..c1ee9b49d8de 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable
import org.apache.hadoop.fs.Path
-import org.apache.spark.{SPARK_DOC_ROOT, SparkException, SparkThrowable,
SparkThrowableHelper, SparkUnsupportedOperationException}
+import org.apache.spark.{SPARK_DOC_ROOT, SparkException, SparkThrowable,
SparkUnsupportedOperationException}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{ExtendedAnalysisException,
FunctionIdentifier, InternalRow, QualifiedTableName, TableIdentifier}
import
org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException,
FunctionAlreadyExistsException, NamespaceAlreadyExistsException,
NoSuchFunctionException, NoSuchNamespaceException, NoSuchPartitionException,
NoSuchTableException, ResolvedTable, Star, TableAlreadyExistsException,
UnresolvedRegex}
@@ -1955,12 +1955,9 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
}
def streamJoinStreamWithoutEqualityPredicateUnsupportedError(plan:
LogicalPlan): Throwable = {
- val errorClass = "_LEGACY_ERROR_TEMP_1181"
new ExtendedAnalysisException(
- SparkThrowableHelper.getMessage(errorClass, Map.empty[String, String]),
- errorClass = Some(errorClass),
- messageParameters = Map.empty,
- plan = Some(plan))
+ new AnalysisException(errorClass = "_LEGACY_ERROR_TEMP_1181",
messageParameters = Map.empty),
+ plan = plan)
}
def invalidPandasUDFPlacementError(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
index 189dedb60f0c..8975a015ee8e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
@@ -149,7 +149,7 @@ private object DB2Dialect extends JdbcDialect {
case sqlException: SQLException =>
sqlException.getSQLState match {
// https://www.ibm.com/docs/en/db2/11.5?topic=messages-sqlstate
- case "42893" => throw NonEmptyNamespaceException(message, cause =
Some(e))
+ case "42893" => throw new NonEmptyNamespaceException(message, cause
= Some(e))
case _ => super.classifyException(message, e)
}
case _ => super.classifyException(message, e)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
index 361da645ee4c..ee649122ca80 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
@@ -194,7 +194,7 @@ private object MsSqlServerDialect extends JdbcDialect {
e match {
case sqlException: SQLException =>
sqlException.getErrorCode match {
- case 3729 => throw NonEmptyNamespaceException(message, cause =
Some(e))
+ case 3729 => throw new NonEmptyNamespaceException(message, cause =
Some(e))
case _ => super.classifyException(message, e)
}
case _ => super.classifyException(message, e)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
index e3af80743272..cff7bb5e06f0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
@@ -254,7 +254,7 @@ private object PostgresDialect extends JdbcDialect with
SQLConfHelper {
val indexName = regex.findFirstMatchIn(message).get.group(1)
val tableName = regex.findFirstMatchIn(message).get.group(2)
throw new NoSuchIndexException(indexName, tableName, cause =
Some(e))
- case "2BP01" => throw NonEmptyNamespaceException(message, cause =
Some(e))
+ case "2BP01" => throw new NonEmptyNamespaceException(message, cause
= Some(e))
case _ => super.classifyException(message, e)
}
case unsupported: UnsupportedOperationException => throw unsupported
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 15cbd69e62e0..b603c95fb30d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2638,10 +2638,14 @@ class SQLQuerySuite extends QueryTest with
SharedSparkSession with AdaptiveSpark
test("SPARK-20164: ExtendedAnalysisException should be tolerant to null
query plan") {
try {
- throw new ExtendedAnalysisException("", None, None, plan = null)
+ throw new ExtendedAnalysisException(
+ new AnalysisException(
+ errorClass = "_LEGACY_ERROR_USER_RAISED_EXCEPTION",
+ messageParameters = Map("errorMessage" -> "null query plan")),
+ plan = null)
} catch {
case ae: ExtendedAnalysisException =>
- assert(ae.plan == null && ae.getMessage == ae.getSimpleMessage)
+ assert(ae.plan == None && ae.getMessage == ae.getSimpleMessage)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]