This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 28961a6ce00 [SPARK-45517][CONNECT] Expand more exception constructors 
to support error framework parameters
28961a6ce00 is described below

commit 28961a6ce001e0c25c780a39a726fdd825542cee
Author: Yihong He <[email protected]>
AuthorDate: Tue Oct 17 09:11:22 2023 +0900

    [SPARK-45517][CONNECT] Expand more exception constructors to support error 
framework parameters
    
    ### What changes were proposed in this pull request?
    
    - Expand more exception constructors to support error framework parameters
    
    ### Why are the changes needed?
    
    - Better integration with the error framework
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    `build/sbt "connect-client-jvm/testOnly *SparkConnectClientSuite"`
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Closes #43368 from heyihong/SPARK-45517.
    
    Authored-by: Yihong He <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../scala/org/apache/spark/SparkException.scala    | 16 ++--
 .../sql/streaming/StreamingQueryException.scala    |  3 +-
 .../connect/client/SparkConnectClientSuite.scala   | 28 ++++++-
 .../connect/client/GrpcExceptionConverter.scala    | 93 ++++++++++++++++++----
 .../catalyst/analysis/alreadyExistException.scala  |  6 +-
 .../catalyst/analysis/noSuchItemsExceptions.scala  |  4 +-
 6 files changed, 119 insertions(+), 31 deletions(-)

diff --git a/common/utils/src/main/scala/org/apache/spark/SparkException.scala 
b/common/utils/src/main/scala/org/apache/spark/SparkException.scala
index 828948b48c1..3bcdd0a7c29 100644
--- a/common/utils/src/main/scala/org/apache/spark/SparkException.scala
+++ b/common/utils/src/main/scala/org/apache/spark/SparkException.scala
@@ -133,7 +133,7 @@ private[spark] case class ExecutorDeadException(message: 
String)
 /**
  * Exception thrown when Spark returns different result after upgrading to a 
new version.
  */
-private[spark] class SparkUpgradeException private(
+private[spark] class SparkUpgradeException(
   message: String,
   cause: Option[Throwable],
   errorClass: Option[String],
@@ -169,7 +169,7 @@ private[spark] class SparkUpgradeException private(
 /**
  * Arithmetic exception thrown from Spark with an error class.
  */
-private[spark] class SparkArithmeticException private(
+private[spark] class SparkArithmeticException(
     message: String,
     errorClass: Option[String],
     messageParameters: Map[String, String],
@@ -207,7 +207,7 @@ private[spark] class SparkArithmeticException private(
 /**
  * Unsupported operation exception thrown from Spark with an error class.
  */
-private[spark] class SparkUnsupportedOperationException private(
+private[spark] class SparkUnsupportedOperationException(
   message: String,
   errorClass: Option[String],
   messageParameters: Map[String, String])
@@ -271,7 +271,7 @@ private[spark] class SparkConcurrentModificationException(
 /**
  * Datetime exception thrown from Spark with an error class.
  */
-private[spark] class SparkDateTimeException private(
+private[spark] class SparkDateTimeException(
     message: String,
     errorClass: Option[String],
     messageParameters: Map[String, String],
@@ -324,7 +324,7 @@ private[spark] class SparkFileNotFoundException(
 /**
  * Number format exception thrown from Spark with an error class.
  */
-private[spark] class SparkNumberFormatException private(
+private[spark] class SparkNumberFormatException private[spark](
     message: String,
     errorClass: Option[String],
     messageParameters: Map[String, String],
@@ -363,7 +363,7 @@ private[spark] class SparkNumberFormatException private(
 /**
  * Illegal argument exception thrown from Spark with an error class.
  */
-private[spark] class SparkIllegalArgumentException private(
+private[spark] class SparkIllegalArgumentException(
     message: String,
     cause: Option[Throwable],
     errorClass: Option[String],
@@ -403,7 +403,7 @@ private[spark] class SparkIllegalArgumentException private(
   override def getQueryContext: Array[QueryContext] = context
 }
 
-private[spark] class SparkRuntimeException private(
+private[spark] class SparkRuntimeException(
     message: String,
     cause: Option[Throwable],
     errorClass: Option[String],
@@ -480,7 +480,7 @@ private[spark] class SparkSecurityException(
 /**
  * Array index out of bounds exception thrown from Spark with an error class.
  */
-private[spark] class SparkArrayIndexOutOfBoundsException private(
+private[spark] class SparkArrayIndexOutOfBoundsException(
   message: String,
   errorClass: Option[String],
   messageParameters: Map[String, String],
diff --git 
a/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
 
b/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
index 77415fb4759..1fde16bfa4e 100644
--- 
a/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
+++ 
b/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
@@ -70,7 +70,8 @@ class StreamingQueryException private[sql](
   /** Time when the exception occurred */
   val time: Long = System.currentTimeMillis
 
-  override def getMessage: String = s"${message}\n${queryDebugString}"
+  override def getMessage: String =
+    if (queryDebugString.isEmpty) message else 
s"${message}\n${queryDebugString}"
 
   override def toString(): String =
     s"""${classOf[StreamingQueryException].getName}: ${cause.getMessage}
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
index 57e0b4016f1..a3df39da4a8 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
@@ -27,10 +27,11 @@ import io.grpc.netty.NettyServerBuilder
 import io.grpc.stub.StreamObserver
 import org.scalatest.BeforeAndAfterEach
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkException, SparkThrowable}
 import org.apache.spark.connect.proto
 import org.apache.spark.connect.proto.{AddArtifactsRequest, 
AddArtifactsResponse, AnalyzePlanRequest, AnalyzePlanResponse, 
ArtifactStatusesRequest, ArtifactStatusesResponse, ExecutePlanRequest, 
ExecutePlanResponse, SparkConnectServiceGrpc}
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.connect.common.config.ConnectCommon
 import org.apache.spark.sql.test.ConnectFunSuite
 
@@ -208,6 +209,31 @@ class SparkConnectClientSuite extends ConnectFunSuite with 
BeforeAndAfterEach {
     }
   }
 
+  for ((name, constructor) <- GrpcExceptionConverter.errorFactory) {
+    test(s"error framework parameters - ${name}") {
+      val testParams = GrpcExceptionConverter.ErrorParams(
+        message = "test message",
+        cause = None,
+        errorClass = Some("test error class"),
+        messageParameters = Map("key" -> "value"),
+        queryContext = Array.empty)
+      val error = constructor(testParams)
+      if (!error.isInstanceOf[ParseException]) {
+        assert(error.getMessage == testParams.message)
+      } else {
+        assert(error.getMessage == s"\n${testParams.message}")
+      }
+      assert(error.getCause == null)
+      error match {
+        case sparkThrowable: SparkThrowable =>
+          assert(sparkThrowable.getErrorClass == testParams.errorClass.get)
+          assert(sparkThrowable.getMessageParameters.asScala == 
testParams.messageParameters)
+          assert(sparkThrowable.getQueryContext.isEmpty)
+        case _ =>
+      }
+    }
+  }
+
   private case class TestPackURI(
       connectionString: String,
       isCorrect: Boolean,
diff --git 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
index a9a6102a0fe..f404cfd2e41 100644
--- 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
+++ 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
@@ -162,9 +162,9 @@ private[client] class GrpcExceptionConverter(grpcStub: 
SparkConnectServiceBlocki
   }
 }
 
-private object GrpcExceptionConverter {
+private[client] object GrpcExceptionConverter {
 
-  private case class ErrorParams(
+  private[client] case class ErrorParams(
       message: String,
       cause: Option[Throwable],
       // errorClass will only be set if the error is both enriched and 
SparkThrowable.
@@ -180,7 +180,7 @@ private object GrpcExceptionConverter {
     (className, throwableCtr)
   }
 
-  private val errorFactory = Map(
+  private[client] val errorFactory = Map(
     errorConstructor(params =>
       new StreamingQueryException(
         params.message,
@@ -203,23 +203,84 @@ private object GrpcExceptionConverter {
         errorClass = params.errorClass,
         messageParameters = params.messageParameters,
         context = params.queryContext)),
-    errorConstructor(params => new 
NamespaceAlreadyExistsException(params.message)),
-    errorConstructor(params => new TableAlreadyExistsException(params.message, 
params.cause)),
-    errorConstructor(params => new 
TempTableAlreadyExistsException(params.message, params.cause)),
-    errorConstructor(params => new NoSuchDatabaseException(params.message, 
params.cause)),
-    errorConstructor(params => new NoSuchTableException(params.message, 
params.cause)),
+    errorConstructor(params =>
+      new NamespaceAlreadyExistsException(
+        params.message,
+        params.errorClass,
+        params.messageParameters)),
+    errorConstructor(params =>
+      new TableAlreadyExistsException(
+        params.message,
+        params.cause,
+        params.errorClass,
+        params.messageParameters)),
+    errorConstructor(params =>
+      new TempTableAlreadyExistsException(
+        params.message,
+        params.cause,
+        params.errorClass,
+        params.messageParameters)),
+    errorConstructor(params =>
+      new NoSuchDatabaseException(
+        params.message,
+        params.cause,
+        params.errorClass,
+        params.messageParameters)),
+    errorConstructor(params =>
+      new NoSuchTableException(
+        params.message,
+        params.cause,
+        params.errorClass,
+        params.messageParameters)),
     errorConstructor[NumberFormatException](params =>
-      new SparkNumberFormatException(params.message)),
+      new SparkNumberFormatException(
+        params.message,
+        params.errorClass,
+        params.messageParameters,
+        params.queryContext)),
     errorConstructor[IllegalArgumentException](params =>
-      new SparkIllegalArgumentException(params.message, params.cause)),
-    errorConstructor[ArithmeticException](params => new 
SparkArithmeticException(params.message)),
+      new SparkIllegalArgumentException(
+        params.message,
+        params.cause,
+        params.errorClass,
+        params.messageParameters,
+        params.queryContext)),
+    errorConstructor[ArithmeticException](params =>
+      new SparkArithmeticException(
+        params.message,
+        params.errorClass,
+        params.messageParameters,
+        params.queryContext)),
     errorConstructor[UnsupportedOperationException](params =>
-      new SparkUnsupportedOperationException(params.message)),
+      new SparkUnsupportedOperationException(
+        params.message,
+        params.errorClass,
+        params.messageParameters)),
     errorConstructor[ArrayIndexOutOfBoundsException](params =>
-      new SparkArrayIndexOutOfBoundsException(params.message)),
-    errorConstructor[DateTimeException](params => new 
SparkDateTimeException(params.message)),
-    errorConstructor(params => new SparkRuntimeException(params.message, 
params.cause)),
-    errorConstructor(params => new SparkUpgradeException(params.message, 
params.cause)),
+      new SparkArrayIndexOutOfBoundsException(
+        params.message,
+        params.errorClass,
+        params.messageParameters,
+        params.queryContext)),
+    errorConstructor[DateTimeException](params =>
+      new SparkDateTimeException(
+        params.message,
+        params.errorClass,
+        params.messageParameters,
+        params.queryContext)),
+    errorConstructor(params =>
+      new SparkRuntimeException(
+        params.message,
+        params.cause,
+        params.errorClass,
+        params.messageParameters,
+        params.queryContext)),
+    errorConstructor(params =>
+      new SparkUpgradeException(
+        params.message,
+        params.cause,
+        params.errorClass,
+        params.messageParameters)),
     errorConstructor(params =>
       new SparkException(
         message = params.message,
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/alreadyExistException.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/alreadyExistException.scala
index adbfafd8d7f..2e1c3380960 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/alreadyExistException.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/alreadyExistException.scala
@@ -31,7 +31,7 @@ class DatabaseAlreadyExistsException(db: String)
   extends NamespaceAlreadyExistsException(Array(db))
 
 // any changes to this class should be backward compatible as it may be used 
by external connectors
-class NamespaceAlreadyExistsException private(
+class NamespaceAlreadyExistsException private[sql](
     message: String,
     errorClass: Option[String],
     messageParameters: Map[String, String])
@@ -61,7 +61,7 @@ class NamespaceAlreadyExistsException private(
 }
 
 // any changes to this class should be backward compatible as it may be used 
by external connectors
-class TableAlreadyExistsException private(
+class TableAlreadyExistsException private[sql](
     message: String,
     cause: Option[Throwable],
     errorClass: Option[String],
@@ -115,7 +115,7 @@ class TableAlreadyExistsException private(
   }
 }
 
-class TempTableAlreadyExistsException private(
+class TempTableAlreadyExistsException private[sql](
   message: String,
   cause: Option[Throwable],
   errorClass: Option[String],
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/noSuchItemsExceptions.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/noSuchItemsExceptions.scala
index d6283a347cc..48c61381b1d 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/noSuchItemsExceptions.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/noSuchItemsExceptions.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.connector.catalog.Identifier
  * Thrown by a catalog when an item cannot be found. The analyzer will rethrow 
the exception
  * as an [[org.apache.spark.sql.AnalysisException]] with the correct position 
information.
  */
-class NoSuchDatabaseException private(
+class NoSuchDatabaseException private[sql](
   message: String,
   cause: Option[Throwable],
   errorClass: Option[String],
@@ -100,7 +100,7 @@ class NoSuchNamespaceException private(
 }
 
 // any changes to this class should be backward compatible as it may be used 
by external connectors
-class NoSuchTableException private(
+class NoSuchTableException private[sql](
     message: String,
     cause: Option[Throwable],
     errorClass: Option[String],


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to