This is an automated email from the ASF dual-hosted git repository.
yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c5e93eb6631 [SPARK-40207][SQL] Specify the column name when the data
type is not supported by datasource
c5e93eb6631 is described below
commit c5e93eb6631b2620908cef0fa3b100f6e3569d68
Author: yikf <[email protected]>
AuthorDate: Tue Aug 30 19:05:23 2022 +0800
[SPARK-40207][SQL] Specify the column name when the data type is not
supported by datasource
### What changes were proposed in this pull request?
Currently, If the data type is not supported by the data source, the
exception message thrown does not contain the column name, which is less clear
for locating the problem
This is a minor fix and aims to specify the column name when the data type
is not supported by datasource
### Why are the changes needed?
More explicit error messages
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
GA
Closes #37574 from Yikf/void-type.
Authored-by: yikf <[email protected]>
Signed-off-by: Yuming Wang <[email protected]>
---
.../org/apache/spark/sql/avro/AvroSuite.scala | 6 +-
.../spark/sql/errors/QueryCompilationErrors.scala | 5 +-
.../spark/sql/FileBasedDataSourceSuite.scala | 68 +++++++++++++---------
.../spark/sql/hive/execution/HiveDDLSuite.scala | 4 +-
.../spark/sql/hive/orc/HiveOrcSourceSuite.scala | 16 +++--
5 files changed, 60 insertions(+), 39 deletions(-)
diff --git
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index bfdeb11fd8a..5e161948932 100644
--- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -1181,14 +1181,16 @@ abstract class AvroSuite
sql("select interval 1
days").write.format("avro").mode("overwrite").save(tempDir)
}.getMessage
assert(msg.contains("Cannot save interval data type into external
storage.") ||
- msg.contains("AVRO data source does not support interval data
type."))
+ msg.contains("Column `INTERVAL '1' DAY` has a data type of interval
day, " +
+ "which is not supported by Avro."))
msg = intercept[AnalysisException] {
spark.udf.register("testType", () => new IntervalData())
sql("select
testType()").write.format("avro").mode("overwrite").save(tempDir)
}.getMessage
assert(msg.toLowerCase(Locale.ROOT)
- .contains(s"avro data source does not support interval data type."))
+ .contains("column `testtype()` has a data type of interval, " +
+ "which is not supported by avro."))
}
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 834e0e6b214..20c3c81b250 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -1190,8 +1190,9 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase {
}
def dataTypeUnsupportedByDataSourceError(format: String, field:
StructField): Throwable = {
- new AnalysisException(
- s"$format data source does not support ${field.dataType.catalogString}
data type.")
+ new AnalysisException(s"Column `${field.name}` has a data type of " +
+ s"${field.dataType.catalogString}, which is not supported by $format."
+ )
}
def failToResolveDataSourceForTableError(table: CatalogTable, key: String):
Throwable = {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 15d367dba88..98cb54ccbbc 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -257,38 +257,44 @@ class FileBasedDataSourceSuite extends QueryTest
// Text file format only supports string type
test("SPARK-24691 error handling for unsupported types - text") {
withTempDir { dir =>
+ def validateErrorMessage(msg: String, column: String, dt: String,
format: String): Unit = {
+ val excepted = s"Column `$column` has a data type of $dt, " +
+ s"which is not supported by $format."
+ assert(msg.contains(excepted))
+ }
+
// write path
val textDir = new File(dir, "text").getCanonicalPath
var msg = intercept[AnalysisException] {
Seq(1).toDF.write.text(textDir)
}.getMessage
- assert(msg.contains("Text data source does not support int data type"))
+ validateErrorMessage(msg, "value", "int", "Text")
msg = intercept[AnalysisException] {
Seq(1.2).toDF.write.text(textDir)
}.getMessage
- assert(msg.contains("Text data source does not support double data
type"))
+ validateErrorMessage(msg, "value", "double", "Text")
msg = intercept[AnalysisException] {
Seq(true).toDF.write.text(textDir)
}.getMessage
- assert(msg.contains("Text data source does not support boolean data
type"))
+ validateErrorMessage(msg, "value", "boolean", "Text")
msg = intercept[AnalysisException] {
Seq(1).toDF("a").selectExpr("struct(a)").write.text(textDir)
}.getMessage
- assert(msg.contains("Text data source does not support struct<a:int>
data type"))
+ validateErrorMessage(msg, "struct(a)", "struct<a:int>", "Text")
msg = intercept[AnalysisException] {
Seq((Map("Tesla" ->
3))).toDF("cars").write.mode("overwrite").text(textDir)
}.getMessage
- assert(msg.contains("Text data source does not support map<string,int>
data type"))
+ validateErrorMessage(msg, "cars", "map<string,int>", "Text")
msg = intercept[AnalysisException] {
Seq((Array("Tesla", "Chevy", "Ford"))).toDF("brands")
.write.mode("overwrite").text(textDir)
}.getMessage
- assert(msg.contains("Text data source does not support array<string>
data type"))
+ validateErrorMessage(msg, "brands", "array<string>", "Text")
// read path
Seq("aaa").toDF.write.mode("overwrite").text(textDir)
@@ -296,19 +302,19 @@ class FileBasedDataSourceSuite extends QueryTest
val schema = StructType(StructField("a", IntegerType, true) :: Nil)
spark.read.schema(schema).text(textDir).collect()
}.getMessage
- assert(msg.contains("Text data source does not support int data type"))
+ validateErrorMessage(msg, "a", "int", "Text")
msg = intercept[AnalysisException] {
val schema = StructType(StructField("a", DoubleType, true) :: Nil)
spark.read.schema(schema).text(textDir).collect()
}.getMessage
- assert(msg.contains("Text data source does not support double data
type"))
+ validateErrorMessage(msg, "a", "double", "Text")
msg = intercept[AnalysisException] {
val schema = StructType(StructField("a", BooleanType, true) :: Nil)
spark.read.schema(schema).text(textDir).collect()
}.getMessage
- assert(msg.contains("Text data source does not support boolean data
type"))
+ validateErrorMessage(msg, "a", "boolean", "Text")
}
}
@@ -319,56 +325,62 @@ class FileBasedDataSourceSuite extends QueryTest
// parquet -> R/W: Interval, Null
test("SPARK-24204 error handling for unsupported Array/Map/Struct types -
csv") {
withTempDir { dir =>
+ def validateErrorMessage(msg: String, column: String, dt: String,
format: String): Unit = {
+ val excepted = s"Column `$column` has a data type of $dt, " +
+ s"which is not supported by $format."
+ assert(msg.contains(excepted))
+ }
+
val csvDir = new File(dir, "csv").getCanonicalPath
var msg = intercept[AnalysisException] {
Seq((1, "Tesla")).toDF("a", "b").selectExpr("struct(a,
b)").write.csv(csvDir)
}.getMessage
- assert(msg.contains("CSV data source does not support
struct<a:int,b:string> data type"))
+ validateErrorMessage(msg, "struct(a, b)", "struct<a:int,b:string>",
"CSV")
msg = intercept[AnalysisException] {
val schema = StructType.fromDDL("a struct<b: Int>")
spark.range(1).write.mode("overwrite").csv(csvDir)
spark.read.schema(schema).csv(csvDir).collect()
}.getMessage
- assert(msg.contains("CSV data source does not support struct<b:int> data
type"))
+ validateErrorMessage(msg, "a", "struct<b:int>", "CSV")
msg = intercept[AnalysisException] {
Seq((1, Map("Tesla" -> 3))).toDF("id",
"cars").write.mode("overwrite").csv(csvDir)
}.getMessage
- assert(msg.contains("CSV data source does not support map<string,int>
data type"))
+ validateErrorMessage(msg, "cars", "map<string,int>", "CSV")
msg = intercept[AnalysisException] {
val schema = StructType.fromDDL("a map<int, int>")
spark.range(1).write.mode("overwrite").csv(csvDir)
spark.read.schema(schema).csv(csvDir).collect()
}.getMessage
- assert(msg.contains("CSV data source does not support map<int,int> data
type"))
+ validateErrorMessage(msg, "a", "map<int,int>", "CSV")
msg = intercept[AnalysisException] {
Seq((1, Array("Tesla", "Chevy", "Ford"))).toDF("id", "brands")
.write.mode("overwrite").csv(csvDir)
}.getMessage
- assert(msg.contains("CSV data source does not support array<string> data
type"))
+ validateErrorMessage(msg, "brands", "array<string>", "CSV")
msg = intercept[AnalysisException] {
val schema = StructType.fromDDL("a array<int>")
spark.range(1).write.mode("overwrite").csv(csvDir)
spark.read.schema(schema).csv(csvDir).collect()
}.getMessage
- assert(msg.contains("CSV data source does not support array<int> data
type"))
+ validateErrorMessage(msg, "a", "array<int>", "CSV")
msg = intercept[AnalysisException] {
Seq((1, new TestUDT.MyDenseVector(Array(0.25, 2.25,
4.25)))).toDF("id", "vectors")
.write.mode("overwrite").csv(csvDir)
}.getMessage
- assert(msg.contains("CSV data source does not support array<double> data
type"))
+ validateErrorMessage(msg, "vectors", "array<double>", "CSV")
msg = intercept[AnalysisException] {
val schema = StructType(StructField("a", new
TestUDT.MyDenseVectorUDT(), true) :: Nil)
spark.range(1).write.mode("overwrite").csv(csvDir)
spark.read.schema(schema).csv(csvDir).collect()
}.getMessage
- assert(msg.contains("CSV data source does not support array<double> data
type."))
+ validateErrorMessage(msg, "a", "array<double>", "CSV")
}
}
@@ -382,9 +394,9 @@ class FileBasedDataSourceSuite extends QueryTest
} else {
""
}
- def validateErrorMessage(msg: String): Unit = {
+ def validateErrorMessage(msg: String, format: String): Unit = {
val msg1 = "cannot save interval data type into external storage."
- val msg2 = "data source does not support interval data type."
+ val msg2 = s"column `a` has a data type of interval, which is not
supported by $format."
assert(msg.toLowerCase(Locale.ROOT).contains(msg1) ||
msg.toLowerCase(Locale.ROOT).contains(msg2))
}
@@ -397,7 +409,7 @@ class FileBasedDataSourceSuite extends QueryTest
val msg = intercept[AnalysisException] {
sql("select interval 1
days").write.format(format).mode("overwrite").save(tempDir)
}.getMessage
- validateErrorMessage(msg)
+ validateErrorMessage(msg, format)
}
// read path
@@ -407,14 +419,14 @@ class FileBasedDataSourceSuite extends QueryTest
spark.range(1).write.format(format).mode("overwrite").save(tempDir)
spark.read.schema(schema).format(format).load(tempDir).collect()
}.getMessage
- validateErrorMessage(msg)
+ validateErrorMessage(msg, format)
msg = intercept[AnalysisException] {
val schema = StructType(StructField("a", new IntervalUDT(),
true) :: Nil)
spark.range(1).write.format(format).mode("overwrite").save(tempDir)
spark.read.schema(schema).format(format).load(tempDir).collect()
}.getMessage
- validateErrorMessage(msg)
+ validateErrorMessage(msg, format)
}
}
}
@@ -429,8 +441,8 @@ class FileBasedDataSourceSuite extends QueryTest
} else {
""
}
- def errorMessage(format: String): String = {
- s"$format data source does not support void data type."
+ def errorMessage(format: String, column: String): String = {
+ s"column `$column` has a data type of void, which is not supported by
$format."
}
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1List) {
withTempDir { dir =>
@@ -442,14 +454,14 @@ class FileBasedDataSourceSuite extends QueryTest
sql("select
null").write.format(format).mode("overwrite").save(tempDir)
}.getMessage
assert(msg.toLowerCase(Locale.ROOT)
- .contains(errorMessage(format)))
+ .contains(errorMessage(format, "null")))
msg = intercept[AnalysisException] {
spark.udf.register("testType", () => new NullData())
sql("select
testType()").write.format(format).mode("overwrite").save(tempDir)
}.getMessage
assert(msg.toLowerCase(Locale.ROOT)
- .contains(errorMessage(format)))
+ .contains(errorMessage(format, "testtype()")))
// read path
msg = intercept[AnalysisException] {
@@ -458,7 +470,7 @@ class FileBasedDataSourceSuite extends QueryTest
spark.read.schema(schema).format(format).load(tempDir).collect()
}.getMessage
assert(msg.toLowerCase(Locale.ROOT)
- .contains(errorMessage(format)))
+ .contains(errorMessage(format, "a")))
msg = intercept[AnalysisException] {
val schema = StructType(StructField("a", new NullUDT(), true) ::
Nil)
@@ -466,7 +478,7 @@ class FileBasedDataSourceSuite extends QueryTest
spark.read.schema(schema).format(format).load(tempDir).collect()
}.getMessage
assert(msg.toLowerCase(Locale.ROOT)
- .contains(errorMessage(format)))
+ .contains(errorMessage(format, "a")))
}
}
}
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 363db9badaf..f1bb8d30eed 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -2379,7 +2379,7 @@ class HiveDDLSuite
withTable("t1", "t2", "t3") {
assertAnalysisError(
"CREATE TABLE t1 USING PARQUET AS SELECT NULL AS null_col",
- "Parquet data source does not support void data type")
+ "Column `null_col` has a data type of void, which is not supported by
Parquet.")
assertAnalysisError(
"CREATE TABLE t2 STORED AS PARQUET AS SELECT null as null_col",
@@ -2393,7 +2393,7 @@ class HiveDDLSuite
withTable("t1", "t2", "t3", "t4") {
assertAnalysisError(
"CREATE TABLE t1 (v VOID) USING PARQUET",
- "Parquet data source does not support void data type")
+ "Column `v` has a data type of void, which is not supported by
Parquet.")
assertAnalysisError(
"CREATE TABLE t2 (v VOID) STORED AS PARQUET",
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
index 574281a6577..61e8ede9d3b 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
@@ -112,22 +112,28 @@ class HiveOrcSourceSuite extends OrcSuite with
TestHiveSingleton {
withTempDir { dir =>
val orcDir = new File(dir, "orc").getCanonicalPath
+ def validateErrorMessage(msg: String, column: String, dt: String,
format: String): Unit = {
+ val excepted = s"Column `$column` has a data type of $dt, " +
+ s"which is not supported by $format."
+ assert(msg.contains(excepted))
+ }
+
// write path
var msg = intercept[AnalysisException] {
sql("select interval 1 days").write.mode("overwrite").orc(orcDir)
}.getMessage
- assert(msg.contains("ORC data source does not support interval day data
type"))
+ validateErrorMessage(msg, "INTERVAL '1' DAY", "interval day", "ORC")
msg = intercept[AnalysisException] {
sql("select null").write.mode("overwrite").orc(orcDir)
}.getMessage
- assert(msg.contains("ORC data source does not support void data type."))
+ validateErrorMessage(msg, "NULL", "void", "ORC")
msg = intercept[AnalysisException] {
spark.udf.register("testType", () => new IntervalData())
sql("select testType()").write.mode("overwrite").orc(orcDir)
}.getMessage
- assert(msg.contains("ORC data source does not support interval data
type."))
+ validateErrorMessage(msg, "testType()", "interval", "ORC")
// read path
msg = intercept[AnalysisException] {
@@ -135,14 +141,14 @@ class HiveOrcSourceSuite extends OrcSuite with
TestHiveSingleton {
spark.range(1).write.mode("overwrite").orc(orcDir)
spark.read.schema(schema).orc(orcDir).collect()
}.getMessage
- assert(msg.contains("ORC data source does not support interval data
type."))
+ validateErrorMessage(msg, "a", "interval", "ORC")
msg = intercept[AnalysisException] {
val schema = StructType(StructField("a", new IntervalUDT(), true) ::
Nil)
spark.range(1).write.mode("overwrite").orc(orcDir)
spark.read.schema(schema).orc(orcDir).collect()
}.getMessage
- assert(msg.contains("ORC data source does not support interval data
type."))
+ validateErrorMessage(msg, "a", "interval", "ORC")
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]