This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bbe6d573d95e [SPARK-49392][SQL] Catch errors when failing to write to 
external data source
bbe6d573d95e is described below

commit bbe6d573d95edf800b4a9de1b16a0ccaffabf1a6
Author: Uros Bojanic <[email protected]>
AuthorDate: Fri Sep 13 19:04:49 2024 +0200

    [SPARK-49392][SQL] Catch errors when failing to write to external data 
source
    
    ### What changes were proposed in this pull request?
    Catch various exceptions thrown by the data source API, when failing to 
write to a data source, and rethrow `externalDataSourceException` to provide a 
more friendly error message for the user.
    
    ### Why are the changes needed?
    To catch non-fatal exceptions when failing to save the results of a query 
into an external data source (for example: `com.crealytics.spark.excel`).
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, error messages when failing to write to an external data source should 
now be more user-friendly.
    
    ### How was this patch tested?
    New test in `ExplainSuite`.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #47873 from uros-db/external-data-source.
    
    Authored-by: Uros Bojanic <[email protected]>
    Signed-off-by: Max Gekk <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |  6 +++++
 .../src/main/resources/error/error-states.json     |  6 +++++
 .../spark/sql/errors/QueryCompilationErrors.scala  |  8 +++++++
 .../datasources/SaveIntoDataSourceCommand.scala    | 26 +++++++++++++++++++---
 .../sql/errors/QueryCompilationErrorsSuite.scala   | 20 +++++++++++++++++
 5 files changed, 63 insertions(+), 3 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 229da4fa17de..0a9dcd52ea83 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -1045,6 +1045,12 @@
     ],
     "sqlState" : "42710"
   },
+  "DATA_SOURCE_EXTERNAL_ERROR" : {
+    "message" : [
+      "Encountered error when saving to external data source."
+    ],
+    "sqlState" : "KD00F"
+  },
   "DATA_SOURCE_NOT_EXIST" : {
     "message" : [
       "Data source '<provider>' not found. Please make sure the data source is 
registered."
diff --git a/common/utils/src/main/resources/error/error-states.json 
b/common/utils/src/main/resources/error/error-states.json
index c369db3f6505..edba6e1d4321 100644
--- a/common/utils/src/main/resources/error/error-states.json
+++ b/common/utils/src/main/resources/error/error-states.json
@@ -7417,6 +7417,12 @@
         "standard": "N",
         "usedBy": ["Databricks"]
     },
+    "KD00F": {
+        "description": "external data source failure",
+        "origin": "Databricks",
+        "standard": "N",
+        "usedBy": ["Databricks"]
+    },
     "P0000": {
         "description": "procedural logic error",
         "origin": "PostgreSQL",
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index fa8ea2f5289f..e4c8c76e958f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -3959,6 +3959,14 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
       messageParameters = Map("provider" -> name))
   }
 
+  def externalDataSourceException(cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "DATA_SOURCE_EXTERNAL_ERROR",
+      messageParameters = Map(),
+      cause = Some(cause)
+    )
+  }
+
   def foundMultipleDataSources(provider: String): Throwable = {
     new AnalysisException(
       errorClass = "FOUND_MULTIPLE_DATA_SOURCES",
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala
index 5423232db429..e44f1d35e9cd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala
@@ -19,12 +19,14 @@ package org.apache.spark.sql.execution.datasources
 
 import scala.util.control.NonFatal
 
+import org.apache.spark.SparkThrowable
 import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, 
CTERelationDef, LogicalPlan, WithCTE}
 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
+import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.command.LeafRunnableCommand
-import org.apache.spark.sql.sources.CreatableRelationProvider
+import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider}
 
 /**
  * Saves the results of `query` in to a data source.
@@ -44,8 +46,26 @@ case class SaveIntoDataSourceCommand(
   override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    val relation = dataSource.createRelation(
-      sparkSession.sqlContext, mode, options, Dataset.ofRows(sparkSession, 
query))
+    var relation: BaseRelation = null
+
+    try {
+      relation = dataSource.createRelation(
+        sparkSession.sqlContext, mode, options, Dataset.ofRows(sparkSession, 
query))
+    } catch {
+      case e: SparkThrowable =>
+        // We should avoid wrapping `SparkThrowable` exceptions into another 
`AnalysisException`.
+        throw e
+      case e @ (_: NullPointerException | _: MatchError | _: 
ArrayIndexOutOfBoundsException) =>
+        // These are some of the exceptions thrown by the data source API. We 
catch these
+        // exceptions here and rethrow 
QueryCompilationErrors.externalDataSourceException to
+        // provide a more friendly error message for the user. This list is 
not exhaustive.
+        throw QueryCompilationErrors.externalDataSourceException(e)
+      case e: Throwable =>
+        // For other exceptions, just rethrow it, since we don't have enough 
information to
+        // provide a better error message for the user at the moment. We may 
want to further
+        // improve the error message handling in the future.
+        throw e
+    }
 
     try {
       val logicalRelation = LogicalRelation(relation, 
toAttributes(relation.schema), None, false)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
index 47a6143bad1d..370c118de9a9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.api.java.{UDF1, UDF2, UDF23Test}
 import org.apache.spark.sql.catalyst.expressions.{Coalesce, Literal, UnsafeRow}
 import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand
 import 
org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter
 import org.apache.spark.sql.expressions.SparkUserDefinedFunction
 import org.apache.spark.sql.functions._
@@ -926,6 +927,25 @@ class QueryCompilationErrorsSuite
       })
     }
   }
+
+  test("Catch and log errors when failing to write to external data source") {
+    val password = "MyPassWord"
+    val token = "MyToken"
+    val value = "value"
+    val options = Map("password" -> password, "token" -> token, "key" -> value)
+    val query = spark.range(10).logicalPlan
+    val cmd = SaveIntoDataSourceCommand(query, null, options, 
SaveMode.Overwrite)
+
+    checkError(
+      exception = intercept[AnalysisException] {
+        cmd.run(spark)
+      },
+      condition = "DATA_SOURCE_EXTERNAL_ERROR",
+      sqlState = "KD00F",
+      parameters = Map.empty
+    )
+  }
+
 }
 
 class MyCastToString extends SparkUserDefinedFunction(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to