This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 28961a6ce00 [SPARK-45517][CONNECT] Expand more exception constructors
to support error framework parameters
28961a6ce00 is described below
commit 28961a6ce001e0c25c780a39a726fdd825542cee
Author: Yihong He <[email protected]>
AuthorDate: Tue Oct 17 09:11:22 2023 +0900
[SPARK-45517][CONNECT] Expand more exception constructors to support error
framework parameters
### What changes were proposed in this pull request?
- Expand more exception constructors to support error framework parameters
### Why are the changes needed?
- Better integration with the error framework
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
`build/sbt "connect-client-jvm/testOnly *SparkConnectClientSuite"`
### Was this patch authored or co-authored using generative AI tooling?
Closes #43368 from heyihong/SPARK-45517.
Authored-by: Yihong He <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../scala/org/apache/spark/SparkException.scala | 16 ++--
.../sql/streaming/StreamingQueryException.scala | 3 +-
.../connect/client/SparkConnectClientSuite.scala | 28 ++++++-
.../connect/client/GrpcExceptionConverter.scala | 93 ++++++++++++++++++----
.../catalyst/analysis/alreadyExistException.scala | 6 +-
.../catalyst/analysis/noSuchItemsExceptions.scala | 4 +-
6 files changed, 119 insertions(+), 31 deletions(-)
diff --git a/common/utils/src/main/scala/org/apache/spark/SparkException.scala
b/common/utils/src/main/scala/org/apache/spark/SparkException.scala
index 828948b48c1..3bcdd0a7c29 100644
--- a/common/utils/src/main/scala/org/apache/spark/SparkException.scala
+++ b/common/utils/src/main/scala/org/apache/spark/SparkException.scala
@@ -133,7 +133,7 @@ private[spark] case class ExecutorDeadException(message:
String)
/**
* Exception thrown when Spark returns different result after upgrading to a
new version.
*/
-private[spark] class SparkUpgradeException private(
+private[spark] class SparkUpgradeException(
message: String,
cause: Option[Throwable],
errorClass: Option[String],
@@ -169,7 +169,7 @@ private[spark] class SparkUpgradeException private(
/**
* Arithmetic exception thrown from Spark with an error class.
*/
-private[spark] class SparkArithmeticException private(
+private[spark] class SparkArithmeticException(
message: String,
errorClass: Option[String],
messageParameters: Map[String, String],
@@ -207,7 +207,7 @@ private[spark] class SparkArithmeticException private(
/**
* Unsupported operation exception thrown from Spark with an error class.
*/
-private[spark] class SparkUnsupportedOperationException private(
+private[spark] class SparkUnsupportedOperationException(
message: String,
errorClass: Option[String],
messageParameters: Map[String, String])
@@ -271,7 +271,7 @@ private[spark] class SparkConcurrentModificationException(
/**
* Datetime exception thrown from Spark with an error class.
*/
-private[spark] class SparkDateTimeException private(
+private[spark] class SparkDateTimeException(
message: String,
errorClass: Option[String],
messageParameters: Map[String, String],
@@ -324,7 +324,7 @@ private[spark] class SparkFileNotFoundException(
/**
* Number format exception thrown from Spark with an error class.
*/
-private[spark] class SparkNumberFormatException private(
+private[spark] class SparkNumberFormatException private[spark](
message: String,
errorClass: Option[String],
messageParameters: Map[String, String],
@@ -363,7 +363,7 @@ private[spark] class SparkNumberFormatException private(
/**
* Illegal argument exception thrown from Spark with an error class.
*/
-private[spark] class SparkIllegalArgumentException private(
+private[spark] class SparkIllegalArgumentException(
message: String,
cause: Option[Throwable],
errorClass: Option[String],
@@ -403,7 +403,7 @@ private[spark] class SparkIllegalArgumentException private(
override def getQueryContext: Array[QueryContext] = context
}
-private[spark] class SparkRuntimeException private(
+private[spark] class SparkRuntimeException(
message: String,
cause: Option[Throwable],
errorClass: Option[String],
@@ -480,7 +480,7 @@ private[spark] class SparkSecurityException(
/**
* Array index out of bounds exception thrown from Spark with an error class.
*/
-private[spark] class SparkArrayIndexOutOfBoundsException private(
+private[spark] class SparkArrayIndexOutOfBoundsException(
message: String,
errorClass: Option[String],
messageParameters: Map[String, String],
diff --git
a/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
b/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
index 77415fb4759..1fde16bfa4e 100644
---
a/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
+++
b/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
@@ -70,7 +70,8 @@ class StreamingQueryException private[sql](
/** Time when the exception occurred */
val time: Long = System.currentTimeMillis
- override def getMessage: String = s"${message}\n${queryDebugString}"
+ override def getMessage: String =
+ if (queryDebugString.isEmpty) message else
s"${message}\n${queryDebugString}"
override def toString(): String =
s"""${classOf[StreamingQueryException].getName}: ${cause.getMessage}
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
index 57e0b4016f1..a3df39da4a8 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
@@ -27,10 +27,11 @@ import io.grpc.netty.NettyServerBuilder
import io.grpc.stub.StreamObserver
import org.scalatest.BeforeAndAfterEach
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkException, SparkThrowable}
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.{AddArtifactsRequest,
AddArtifactsResponse, AnalyzePlanRequest, AnalyzePlanResponse,
ArtifactStatusesRequest, ArtifactStatusesResponse, ExecutePlanRequest,
ExecutePlanResponse, SparkConnectServiceGrpc}
import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.connect.common.config.ConnectCommon
import org.apache.spark.sql.test.ConnectFunSuite
@@ -208,6 +209,31 @@ class SparkConnectClientSuite extends ConnectFunSuite with
BeforeAndAfterEach {
}
}
+ for ((name, constructor) <- GrpcExceptionConverter.errorFactory) {
+ test(s"error framework parameters - ${name}") {
+ val testParams = GrpcExceptionConverter.ErrorParams(
+ message = "test message",
+ cause = None,
+ errorClass = Some("test error class"),
+ messageParameters = Map("key" -> "value"),
+ queryContext = Array.empty)
+ val error = constructor(testParams)
+ if (!error.isInstanceOf[ParseException]) {
+ assert(error.getMessage == testParams.message)
+ } else {
+ assert(error.getMessage == s"\n${testParams.message}")
+ }
+ assert(error.getCause == null)
+ error match {
+ case sparkThrowable: SparkThrowable =>
+ assert(sparkThrowable.getErrorClass == testParams.errorClass.get)
+ assert(sparkThrowable.getMessageParameters.asScala ==
testParams.messageParameters)
+ assert(sparkThrowable.getQueryContext.isEmpty)
+ case _ =>
+ }
+ }
+ }
+
private case class TestPackURI(
connectionString: String,
isCorrect: Boolean,
diff --git
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
index a9a6102a0fe..f404cfd2e41 100644
---
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
+++
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
@@ -162,9 +162,9 @@ private[client] class GrpcExceptionConverter(grpcStub:
SparkConnectServiceBlocki
}
}
-private object GrpcExceptionConverter {
+private[client] object GrpcExceptionConverter {
- private case class ErrorParams(
+ private[client] case class ErrorParams(
message: String,
cause: Option[Throwable],
// errorClass will only be set if the error is both enriched and
SparkThrowable.
@@ -180,7 +180,7 @@ private object GrpcExceptionConverter {
(className, throwableCtr)
}
- private val errorFactory = Map(
+ private[client] val errorFactory = Map(
errorConstructor(params =>
new StreamingQueryException(
params.message,
@@ -203,23 +203,84 @@ private object GrpcExceptionConverter {
errorClass = params.errorClass,
messageParameters = params.messageParameters,
context = params.queryContext)),
- errorConstructor(params => new
NamespaceAlreadyExistsException(params.message)),
- errorConstructor(params => new TableAlreadyExistsException(params.message,
params.cause)),
- errorConstructor(params => new
TempTableAlreadyExistsException(params.message, params.cause)),
- errorConstructor(params => new NoSuchDatabaseException(params.message,
params.cause)),
- errorConstructor(params => new NoSuchTableException(params.message,
params.cause)),
+ errorConstructor(params =>
+ new NamespaceAlreadyExistsException(
+ params.message,
+ params.errorClass,
+ params.messageParameters)),
+ errorConstructor(params =>
+ new TableAlreadyExistsException(
+ params.message,
+ params.cause,
+ params.errorClass,
+ params.messageParameters)),
+ errorConstructor(params =>
+ new TempTableAlreadyExistsException(
+ params.message,
+ params.cause,
+ params.errorClass,
+ params.messageParameters)),
+ errorConstructor(params =>
+ new NoSuchDatabaseException(
+ params.message,
+ params.cause,
+ params.errorClass,
+ params.messageParameters)),
+ errorConstructor(params =>
+ new NoSuchTableException(
+ params.message,
+ params.cause,
+ params.errorClass,
+ params.messageParameters)),
errorConstructor[NumberFormatException](params =>
- new SparkNumberFormatException(params.message)),
+ new SparkNumberFormatException(
+ params.message,
+ params.errorClass,
+ params.messageParameters,
+ params.queryContext)),
errorConstructor[IllegalArgumentException](params =>
- new SparkIllegalArgumentException(params.message, params.cause)),
- errorConstructor[ArithmeticException](params => new
SparkArithmeticException(params.message)),
+ new SparkIllegalArgumentException(
+ params.message,
+ params.cause,
+ params.errorClass,
+ params.messageParameters,
+ params.queryContext)),
+ errorConstructor[ArithmeticException](params =>
+ new SparkArithmeticException(
+ params.message,
+ params.errorClass,
+ params.messageParameters,
+ params.queryContext)),
errorConstructor[UnsupportedOperationException](params =>
- new SparkUnsupportedOperationException(params.message)),
+ new SparkUnsupportedOperationException(
+ params.message,
+ params.errorClass,
+ params.messageParameters)),
errorConstructor[ArrayIndexOutOfBoundsException](params =>
- new SparkArrayIndexOutOfBoundsException(params.message)),
- errorConstructor[DateTimeException](params => new
SparkDateTimeException(params.message)),
- errorConstructor(params => new SparkRuntimeException(params.message,
params.cause)),
- errorConstructor(params => new SparkUpgradeException(params.message,
params.cause)),
+ new SparkArrayIndexOutOfBoundsException(
+ params.message,
+ params.errorClass,
+ params.messageParameters,
+ params.queryContext)),
+ errorConstructor[DateTimeException](params =>
+ new SparkDateTimeException(
+ params.message,
+ params.errorClass,
+ params.messageParameters,
+ params.queryContext)),
+ errorConstructor(params =>
+ new SparkRuntimeException(
+ params.message,
+ params.cause,
+ params.errorClass,
+ params.messageParameters,
+ params.queryContext)),
+ errorConstructor(params =>
+ new SparkUpgradeException(
+ params.message,
+ params.cause,
+ params.errorClass,
+ params.messageParameters)),
errorConstructor(params =>
new SparkException(
message = params.message,
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/alreadyExistException.scala
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/alreadyExistException.scala
index adbfafd8d7f..2e1c3380960 100644
---
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/alreadyExistException.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/alreadyExistException.scala
@@ -31,7 +31,7 @@ class DatabaseAlreadyExistsException(db: String)
extends NamespaceAlreadyExistsException(Array(db))
// any changes to this class should be backward compatible as it may be used
by external connectors
-class NamespaceAlreadyExistsException private(
+class NamespaceAlreadyExistsException private[sql](
message: String,
errorClass: Option[String],
messageParameters: Map[String, String])
@@ -61,7 +61,7 @@ class NamespaceAlreadyExistsException private(
}
// any changes to this class should be backward compatible as it may be used
by external connectors
-class TableAlreadyExistsException private(
+class TableAlreadyExistsException private[sql](
message: String,
cause: Option[Throwable],
errorClass: Option[String],
@@ -115,7 +115,7 @@ class TableAlreadyExistsException private(
}
}
-class TempTableAlreadyExistsException private(
+class TempTableAlreadyExistsException private[sql](
message: String,
cause: Option[Throwable],
errorClass: Option[String],
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/noSuchItemsExceptions.scala
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/noSuchItemsExceptions.scala
index d6283a347cc..48c61381b1d 100644
---
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/noSuchItemsExceptions.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/noSuchItemsExceptions.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.connector.catalog.Identifier
* Thrown by a catalog when an item cannot be found. The analyzer will rethrow
the exception
* as an [[org.apache.spark.sql.AnalysisException]] with the correct position
information.
*/
-class NoSuchDatabaseException private(
+class NoSuchDatabaseException private[sql](
message: String,
cause: Option[Throwable],
errorClass: Option[String],
@@ -100,7 +100,7 @@ class NoSuchNamespaceException private(
}
// any changes to this class should be backward compatible as it may be used
by external connectors
-class NoSuchTableException private(
+class NoSuchTableException private[sql](
message: String,
cause: Option[Throwable],
errorClass: Option[String],
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]