This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new bbe6d573d95e [SPARK-49392][SQL] Catch errors when failing to write to
external data source
bbe6d573d95e is described below
commit bbe6d573d95edf800b4a9de1b16a0ccaffabf1a6
Author: Uros Bojanic <[email protected]>
AuthorDate: Fri Sep 13 19:04:49 2024 +0200
[SPARK-49392][SQL] Catch errors when failing to write to external data
source
### What changes were proposed in this pull request?
Catch various exceptions thrown by the data source API, when failing to
write to a data source, and rethrow `externalDataSourceException` to provide a
more friendly error message for the user.
### Why are the changes needed?
To catch non-fatal exceptions when failing to save the results of a query
into an external data source (for example: `com.crealytics.spark.excel`).
### Does this PR introduce _any_ user-facing change?
Yes, error messages when failing to write to an external data source should
now be more user-friendly.
### How was this patch tested?
New test in `ExplainSuite`.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #47873 from uros-db/external-data-source.
Authored-by: Uros Bojanic <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 6 +++++
.../src/main/resources/error/error-states.json | 6 +++++
.../spark/sql/errors/QueryCompilationErrors.scala | 8 +++++++
.../datasources/SaveIntoDataSourceCommand.scala | 26 +++++++++++++++++++---
.../sql/errors/QueryCompilationErrorsSuite.scala | 20 +++++++++++++++++
5 files changed, 63 insertions(+), 3 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index 229da4fa17de..0a9dcd52ea83 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -1045,6 +1045,12 @@
],
"sqlState" : "42710"
},
+ "DATA_SOURCE_EXTERNAL_ERROR" : {
+ "message" : [
+ "Encountered error when saving to external data source."
+ ],
+ "sqlState" : "KD00F"
+ },
"DATA_SOURCE_NOT_EXIST" : {
"message" : [
"Data source '<provider>' not found. Please make sure the data source is
registered."
diff --git a/common/utils/src/main/resources/error/error-states.json
b/common/utils/src/main/resources/error/error-states.json
index c369db3f6505..edba6e1d4321 100644
--- a/common/utils/src/main/resources/error/error-states.json
+++ b/common/utils/src/main/resources/error/error-states.json
@@ -7417,6 +7417,12 @@
"standard": "N",
"usedBy": ["Databricks"]
},
+ "KD00F": {
+ "description": "external data source failure",
+ "origin": "Databricks",
+ "standard": "N",
+ "usedBy": ["Databricks"]
+ },
"P0000": {
"description": "procedural logic error",
"origin": "PostgreSQL",
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index fa8ea2f5289f..e4c8c76e958f 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -3959,6 +3959,14 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
messageParameters = Map("provider" -> name))
}
+ def externalDataSourceException(cause: Throwable): Throwable = {
+ new AnalysisException(
+ errorClass = "DATA_SOURCE_EXTERNAL_ERROR",
+ messageParameters = Map(),
+ cause = Some(cause)
+ )
+ }
+
def foundMultipleDataSources(provider: String): Throwable = {
new AnalysisException(
errorClass = "FOUND_MULTIPLE_DATA_SOURCES",
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala
index 5423232db429..e44f1d35e9cd 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala
@@ -19,12 +19,14 @@ package org.apache.spark.sql.execution.datasources
import scala.util.control.NonFatal
+import org.apache.spark.SparkThrowable
import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren,
CTERelationDef, LogicalPlan, WithCTE}
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
+import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.command.LeafRunnableCommand
-import org.apache.spark.sql.sources.CreatableRelationProvider
+import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider}
/**
* Saves the results of `query` in to a data source.
@@ -44,8 +46,26 @@ case class SaveIntoDataSourceCommand(
override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
override def run(sparkSession: SparkSession): Seq[Row] = {
- val relation = dataSource.createRelation(
- sparkSession.sqlContext, mode, options, Dataset.ofRows(sparkSession,
query))
+ var relation: BaseRelation = null
+
+ try {
+ relation = dataSource.createRelation(
+ sparkSession.sqlContext, mode, options, Dataset.ofRows(sparkSession,
query))
+ } catch {
+ case e: SparkThrowable =>
+ // We should avoid wrapping `SparkThrowable` exceptions into another
`AnalysisException`.
+ throw e
+ case e @ (_: NullPointerException | _: MatchError | _:
ArrayIndexOutOfBoundsException) =>
+ // These are some of the exceptions thrown by the data source API. We
catch these
+ // exceptions here and rethrow
QueryCompilationErrors.externalDataSourceException to
+ // provide a more friendly error message for the user. This list is
not exhaustive.
+ throw QueryCompilationErrors.externalDataSourceException(e)
+ case e: Throwable =>
+ // For other exceptions, just rethrow it, since we don't have enough
information to
+ // provide a better error message for the user at the moment. We may
want to further
+ // improve the error message handling in the future.
+ throw e
+ }
try {
val logicalRelation = LogicalRelation(relation,
toAttributes(relation.schema), None, false)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
index 47a6143bad1d..370c118de9a9 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.api.java.{UDF1, UDF2, UDF23Test}
import org.apache.spark.sql.catalyst.expressions.{Coalesce, Literal, UnsafeRow}
import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand
import
org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter
import org.apache.spark.sql.expressions.SparkUserDefinedFunction
import org.apache.spark.sql.functions._
@@ -926,6 +927,25 @@ class QueryCompilationErrorsSuite
})
}
}
+
+ test("Catch and log errors when failing to write to external data source") {
+ val password = "MyPassWord"
+ val token = "MyToken"
+ val value = "value"
+ val options = Map("password" -> password, "token" -> token, "key" -> value)
+ val query = spark.range(10).logicalPlan
+ val cmd = SaveIntoDataSourceCommand(query, null, options,
SaveMode.Overwrite)
+
+ checkError(
+ exception = intercept[AnalysisException] {
+ cmd.run(spark)
+ },
+ condition = "DATA_SOURCE_EXTERNAL_ERROR",
+ sqlState = "KD00F",
+ parameters = Map.empty
+ )
+ }
+
}
class MyCastToString extends SparkUserDefinedFunction(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]