This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 26527c43e65 [SPARK-43299][SS][CONNECT] Convert StreamingQueryException 
in Scala Client
26527c43e65 is described below

commit 26527c43e652718c6d6be8f2ae2f92e835e1b328
Author: Yihong He <[email protected]>
AuthorDate: Tue Oct 10 08:41:07 2023 +0900

    [SPARK-43299][SS][CONNECT] Convert StreamingQueryException in Scala Client
    
    ### What changes were proposed in this pull request?
    
    - Convert StreamingQueryException in Scala Client
    - Move StreamingQueryException to common/utils module
    - Implement (message, cause) constructor and getMessage() for 
StreamingQueryException
    - Get StreamingQueryException in StreamingQuery.exception by throw instead 
of GRPC response
    
    ### Why are the changes needed?
    
    - Compatibility with the existing behavior
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    - Existing tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Closes #42859 from heyihong/SPARK-43299.
    
    Authored-by: Yihong He <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../sql/streaming/StreamingQueryException.scala    | 10 +++++
 .../spark/sql/streaming/StreamingQuery.scala       | 18 ++++-----
 .../sql/streaming/StreamingQueryException.scala    | 47 ----------------------
 .../CheckConnectJvmClientCompatibility.scala       | 12 ------
 .../sql/streaming/ClientStreamingQuerySuite.scala  |  4 +-
 .../connect/client/GrpcExceptionConverter.scala    |  7 ++++
 .../sql/connect/planner/SparkConnectPlanner.scala  |  6 +++
 project/MimaExcludes.scala                         |  3 ++
 8 files changed, 37 insertions(+), 70 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
 
b/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
similarity index 89%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
rename to 
common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
index 738c79769bb..77415fb4759 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
+++ 
b/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
@@ -42,6 +42,14 @@ class StreamingQueryException private[sql](
     messageParameters: Map[String, String])
   extends Exception(message, cause) with SparkThrowable {
 
+  private[spark] def this(
+      message: String,
+      cause: Throwable,
+      errorClass: String,
+      messageParameters: Map[String, String]) = {
+    this("", message, cause, null, null, errorClass, messageParameters)
+  }
+
   def this(
       queryDebugString: String,
       cause: Throwable,
@@ -62,6 +70,8 @@ class StreamingQueryException private[sql](
   /** Time when the exception occurred */
   val time: Long = System.currentTimeMillis
 
+  override def getMessage: String = s"${message}\n${queryDebugString}"
+
   override def toString(): String =
     s"""${classOf[StreamingQueryException].getName}: ${cause.getMessage}
        |$queryDebugString""".stripMargin
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
index a48367b468d..48ef0a907b5 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
@@ -242,17 +242,15 @@ class RemoteStreamingQuery(
   }
 
   override def exception: Option[StreamingQueryException] = {
-    val exception = executeQueryCmd(_.setException(true)).getException
-    if (exception.hasExceptionMessage) {
-      Some(
-        new StreamingQueryException(
-          // message maps to the return value of original 
StreamingQueryException's toString method
-          message = exception.getExceptionMessage,
-          errorClass = exception.getErrorClass,
-          stackTrace = exception.getStackTrace))
-    } else {
-      None
+    try {
+      // When exception field is set to false, the server throws a 
StreamingQueryException
+      // to the client.
+      executeQueryCmd(_.setException(false))
+    } catch {
+      case e: StreamingQueryException => return Some(e)
     }
+
+    None
   }
 
   private def executeQueryCmd(
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
deleted file mode 100644
index 512c94f5c70..00000000000
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import org.apache.spark.SparkThrowable
-import org.apache.spark.annotation.Evolving
-
-/**
- * Exception that stopped a [[StreamingQuery]] in Spark Connect. Currently not 
all fields in the
- * original StreamingQueryException are supported.
- * @param message
- *   Maps to return value of original StreamingQueryException's toString method
- * @param errorClass
- *   Error class of this exception
- * @param stackTrace
- *   Stack trace of this exception
- * @since 3.5.0
- */
-@Evolving
-class StreamingQueryException private[sql] (
-    message: String,
-    errorClass: String,
-    stackTrace: String)
-    extends Exception(message)
-    with SparkThrowable {
-
-  override def getErrorClass: String = errorClass
-
-  override def toString: String = s"""$message
-    |JVM stacktrace: $stackTrace
-    |""".stripMargin
-}
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index a6cd20aff68..785e1fa4017 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -254,18 +254,6 @@ object CheckConnectJvmClientCompatibility {
         "org.apache.spark.sql.streaming.DataStreamWriter.SOURCE*" // These are 
constant vals.
       ),
 
-      // StreamingQueryException
-      ProblemFilters.exclude[DirectMissingMethodProblem](
-        "org.apache.spark.sql.streaming.StreamingQueryException.message"),
-      ProblemFilters.exclude[DirectMissingMethodProblem](
-        "org.apache.spark.sql.streaming.StreamingQueryException.cause"),
-      ProblemFilters.exclude[DirectMissingMethodProblem](
-        "org.apache.spark.sql.streaming.StreamingQueryException.startOffset"),
-      ProblemFilters.exclude[DirectMissingMethodProblem](
-        "org.apache.spark.sql.streaming.StreamingQueryException.endOffset"),
-      ProblemFilters.exclude[DirectMissingMethodProblem](
-        "org.apache.spark.sql.streaming.StreamingQueryException.time"),
-
       // Classes missing from streaming API
       ProblemFilters.exclude[MissingClassProblem](
         "org.apache.spark.sql.streaming.TestGroupState"),
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala
index 8bb83a1ac25..f995a0c3d94 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala
@@ -199,10 +199,12 @@ class ClientStreamingQuerySuite extends QueryTest with 
SQLHelper with Logging {
         .format("console")
         .start()
 
-      val exception = intercept[SparkException] {
+      val exception = intercept[StreamingQueryException] {
         query.awaitTermination()
       }
 
+      assert(exception.getErrorClass != null)
+      assert(!exception.getMessageParameters.isEmpty)
       assert(exception.getCause.isInstanceOf[SparkException])
       assert(exception.getCause.getCause.isInstanceOf[SparkException])
       assert(exception.getCause.getCause.getCause.isInstanceOf[SparkException])
diff --git 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
index 1d608bdf03c..85a523a1372 100644
--- 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
+++ 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.AnalysisException
 import 
org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, 
NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException, 
TempTableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.trees.Origin
+import org.apache.spark.sql.streaming.StreamingQueryException
 
 /**
  * GrpcExceptionConverter handles the conversion of StatusRuntimeExceptions 
into Spark exceptions.
@@ -178,6 +179,12 @@ private object GrpcExceptionConverter {
   }
 
   private val errorFactory = Map(
+    errorConstructor(params =>
+      new StreamingQueryException(
+        params.message,
+        params.cause.orNull,
+        params.errorClass.orNull,
+        params.messageParameters)),
     errorConstructor(params =>
       new ParseException(
         None,
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 52c61c2cbe9..eead5cb38ad 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -3076,6 +3076,12 @@ class SparkConnectPlanner(val sessionHolder: 
SessionHolder) extends Logging {
       case StreamingQueryCommand.CommandCase.EXCEPTION =>
         val result = query.exception
         if (result.isDefined) {
+          // Throw StreamingQueryException directly and rely on error 
translation on the
+          // client-side to reconstruct the exception. Keep the remaining 
implementation
+          // for backward-compatibility
+          if (!command.getException) {
+            throw result.get
+          }
           val e = result.get
           val exception_builder = StreamingQueryCommandResult.ExceptionResult
             .newBuilder()
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 52440ca7d17..691425cfc5d 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -74,6 +74,9 @@ object MimaExcludes {
     
ProblemFilters.exclude[Problem]("org.sparkproject.spark_protobuf.protobuf.*"),
     
ProblemFilters.exclude[Problem]("org.apache.spark.sql.protobuf.utils.SchemaConverters.*"),
 
+    // SPARK-43299: Convert StreamingQueryException in Scala Client
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryException"),
+
     (problem: Problem) => problem match {
       case MissingClassProblem(cls) => 
!cls.fullName.startsWith("org.sparkproject.jpmml") &&
           !cls.fullName.startsWith("org.sparkproject.dmg.pmml")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to