This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c5e93eb6631 [SPARK-40207][SQL] Specify the column name when the data 
type is not supported by datasource
c5e93eb6631 is described below

commit c5e93eb6631b2620908cef0fa3b100f6e3569d68
Author: yikf <[email protected]>
AuthorDate: Tue Aug 30 19:05:23 2022 +0800

    [SPARK-40207][SQL] Specify the column name when the data type is not 
supported by datasource
    
    ### What changes were proposed in this pull request?
    
    Currently, If the data type is not supported by the data source, the 
exception message thrown does not contain the column name, which is less clear 
for locating the problem
    
    This is a minor fix and aims to specify the column name when the data type 
is not supported by datasource
    
    ### Why are the changes needed?
    
    More explicit error messages
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    GA
    
    Closes #37574 from Yikf/void-type.
    
    Authored-by: yikf <[email protected]>
    Signed-off-by: Yuming Wang <[email protected]>
---
 .../org/apache/spark/sql/avro/AvroSuite.scala      |  6 +-
 .../spark/sql/errors/QueryCompilationErrors.scala  |  5 +-
 .../spark/sql/FileBasedDataSourceSuite.scala       | 68 +++++++++++++---------
 .../spark/sql/hive/execution/HiveDDLSuite.scala    |  4 +-
 .../spark/sql/hive/orc/HiveOrcSourceSuite.scala    | 16 +++--
 5 files changed, 60 insertions(+), 39 deletions(-)

diff --git 
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala 
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index bfdeb11fd8a..5e161948932 100644
--- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -1181,14 +1181,16 @@ abstract class AvroSuite
           sql("select interval 1 
days").write.format("avro").mode("overwrite").save(tempDir)
         }.getMessage
         assert(msg.contains("Cannot save interval data type into external 
storage.") ||
-          msg.contains("AVRO data source does not support interval data 
type."))
+          msg.contains("Column `INTERVAL '1' DAY` has a data type of interval 
day, " +
+            "which is not supported by Avro."))
 
         msg = intercept[AnalysisException] {
           spark.udf.register("testType", () => new IntervalData())
           sql("select 
testType()").write.format("avro").mode("overwrite").save(tempDir)
         }.getMessage
         assert(msg.toLowerCase(Locale.ROOT)
-          .contains(s"avro data source does not support interval data type."))
+          .contains("column `testtype()` has a data type of interval, " +
+            "which is not supported by avro."))
       }
     }
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 834e0e6b214..20c3c81b250 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -1190,8 +1190,9 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
   }
 
   def dataTypeUnsupportedByDataSourceError(format: String, field: 
StructField): Throwable = {
-    new AnalysisException(
-      s"$format data source does not support ${field.dataType.catalogString} 
data type.")
+    new AnalysisException(s"Column `${field.name}` has a data type of " +
+      s"${field.dataType.catalogString}, which is not supported by $format."
+    )
   }
 
   def failToResolveDataSourceForTableError(table: CatalogTable, key: String): 
Throwable = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 15d367dba88..98cb54ccbbc 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -257,38 +257,44 @@ class FileBasedDataSourceSuite extends QueryTest
   // Text file format only supports string type
   test("SPARK-24691 error handling for unsupported types - text") {
     withTempDir { dir =>
+      def validateErrorMessage(msg: String, column: String, dt: String, 
format: String): Unit = {
+        val excepted = s"Column `$column` has a data type of $dt, " +
+          s"which is not supported by $format."
+        assert(msg.contains(excepted))
+      }
+
       // write path
       val textDir = new File(dir, "text").getCanonicalPath
       var msg = intercept[AnalysisException] {
         Seq(1).toDF.write.text(textDir)
       }.getMessage
-      assert(msg.contains("Text data source does not support int data type"))
+      validateErrorMessage(msg, "value", "int", "Text")
 
       msg = intercept[AnalysisException] {
         Seq(1.2).toDF.write.text(textDir)
       }.getMessage
-      assert(msg.contains("Text data source does not support double data 
type"))
+      validateErrorMessage(msg, "value", "double", "Text")
 
       msg = intercept[AnalysisException] {
         Seq(true).toDF.write.text(textDir)
       }.getMessage
-      assert(msg.contains("Text data source does not support boolean data 
type"))
+      validateErrorMessage(msg, "value", "boolean", "Text")
 
       msg = intercept[AnalysisException] {
         Seq(1).toDF("a").selectExpr("struct(a)").write.text(textDir)
       }.getMessage
-      assert(msg.contains("Text data source does not support struct<a:int> 
data type"))
+      validateErrorMessage(msg, "struct(a)", "struct<a:int>", "Text")
 
       msg = intercept[AnalysisException] {
         Seq((Map("Tesla" -> 
3))).toDF("cars").write.mode("overwrite").text(textDir)
       }.getMessage
-      assert(msg.contains("Text data source does not support map<string,int> 
data type"))
+      validateErrorMessage(msg, "cars", "map<string,int>", "Text")
 
       msg = intercept[AnalysisException] {
         Seq((Array("Tesla", "Chevy", "Ford"))).toDF("brands")
           .write.mode("overwrite").text(textDir)
       }.getMessage
-      assert(msg.contains("Text data source does not support array<string> 
data type"))
+      validateErrorMessage(msg, "brands", "array<string>", "Text")
 
       // read path
       Seq("aaa").toDF.write.mode("overwrite").text(textDir)
@@ -296,19 +302,19 @@ class FileBasedDataSourceSuite extends QueryTest
         val schema = StructType(StructField("a", IntegerType, true) :: Nil)
         spark.read.schema(schema).text(textDir).collect()
       }.getMessage
-      assert(msg.contains("Text data source does not support int data type"))
+      validateErrorMessage(msg, "a", "int", "Text")
 
       msg = intercept[AnalysisException] {
         val schema = StructType(StructField("a", DoubleType, true) :: Nil)
         spark.read.schema(schema).text(textDir).collect()
       }.getMessage
-      assert(msg.contains("Text data source does not support double data 
type"))
+      validateErrorMessage(msg, "a", "double", "Text")
 
       msg = intercept[AnalysisException] {
         val schema = StructType(StructField("a", BooleanType, true) :: Nil)
         spark.read.schema(schema).text(textDir).collect()
       }.getMessage
-      assert(msg.contains("Text data source does not support boolean data 
type"))
+      validateErrorMessage(msg, "a", "boolean", "Text")
     }
   }
 
@@ -319,56 +325,62 @@ class FileBasedDataSourceSuite extends QueryTest
   //  parquet -> R/W: Interval, Null
   test("SPARK-24204 error handling for unsupported Array/Map/Struct types - 
csv") {
     withTempDir { dir =>
+      def validateErrorMessage(msg: String, column: String, dt: String, 
format: String): Unit = {
+        val excepted = s"Column `$column` has a data type of $dt, " +
+          s"which is not supported by $format."
+        assert(msg.contains(excepted))
+      }
+
       val csvDir = new File(dir, "csv").getCanonicalPath
       var msg = intercept[AnalysisException] {
         Seq((1, "Tesla")).toDF("a", "b").selectExpr("struct(a, 
b)").write.csv(csvDir)
       }.getMessage
-      assert(msg.contains("CSV data source does not support 
struct<a:int,b:string> data type"))
+      validateErrorMessage(msg, "struct(a, b)", "struct<a:int,b:string>", 
"CSV")
 
       msg = intercept[AnalysisException] {
         val schema = StructType.fromDDL("a struct<b: Int>")
         spark.range(1).write.mode("overwrite").csv(csvDir)
         spark.read.schema(schema).csv(csvDir).collect()
       }.getMessage
-      assert(msg.contains("CSV data source does not support struct<b:int> data 
type"))
+      validateErrorMessage(msg, "a", "struct<b:int>", "CSV")
 
       msg = intercept[AnalysisException] {
         Seq((1, Map("Tesla" -> 3))).toDF("id", 
"cars").write.mode("overwrite").csv(csvDir)
       }.getMessage
-      assert(msg.contains("CSV data source does not support map<string,int> 
data type"))
+      validateErrorMessage(msg, "cars", "map<string,int>", "CSV")
 
       msg = intercept[AnalysisException] {
         val schema = StructType.fromDDL("a map<int, int>")
         spark.range(1).write.mode("overwrite").csv(csvDir)
         spark.read.schema(schema).csv(csvDir).collect()
       }.getMessage
-      assert(msg.contains("CSV data source does not support map<int,int> data 
type"))
+      validateErrorMessage(msg, "a", "map<int,int>", "CSV")
 
       msg = intercept[AnalysisException] {
         Seq((1, Array("Tesla", "Chevy", "Ford"))).toDF("id", "brands")
           .write.mode("overwrite").csv(csvDir)
       }.getMessage
-      assert(msg.contains("CSV data source does not support array<string> data 
type"))
+      validateErrorMessage(msg, "brands", "array<string>", "CSV")
 
       msg = intercept[AnalysisException] {
          val schema = StructType.fromDDL("a array<int>")
          spark.range(1).write.mode("overwrite").csv(csvDir)
          spark.read.schema(schema).csv(csvDir).collect()
        }.getMessage
-      assert(msg.contains("CSV data source does not support array<int> data 
type"))
+      validateErrorMessage(msg, "a", "array<int>", "CSV")
 
       msg = intercept[AnalysisException] {
         Seq((1, new TestUDT.MyDenseVector(Array(0.25, 2.25, 
4.25)))).toDF("id", "vectors")
           .write.mode("overwrite").csv(csvDir)
       }.getMessage
-      assert(msg.contains("CSV data source does not support array<double> data 
type"))
+      validateErrorMessage(msg, "vectors", "array<double>", "CSV")
 
       msg = intercept[AnalysisException] {
         val schema = StructType(StructField("a", new 
TestUDT.MyDenseVectorUDT(), true) :: Nil)
         spark.range(1).write.mode("overwrite").csv(csvDir)
         spark.read.schema(schema).csv(csvDir).collect()
       }.getMessage
-      assert(msg.contains("CSV data source does not support array<double> data 
type."))
+      validateErrorMessage(msg, "a", "array<double>", "CSV")
     }
   }
 
@@ -382,9 +394,9 @@ class FileBasedDataSourceSuite extends QueryTest
         } else {
           ""
         }
-        def validateErrorMessage(msg: String): Unit = {
+        def validateErrorMessage(msg: String, format: String): Unit = {
           val msg1 = "cannot save interval data type into external storage."
-          val msg2 = "data source does not support interval data type."
+          val msg2 = s"column `a` has a data type of interval, which is not 
supported by $format."
           assert(msg.toLowerCase(Locale.ROOT).contains(msg1) ||
             msg.toLowerCase(Locale.ROOT).contains(msg2))
         }
@@ -397,7 +409,7 @@ class FileBasedDataSourceSuite extends QueryTest
             val msg = intercept[AnalysisException] {
               sql("select interval 1 
days").write.format(format).mode("overwrite").save(tempDir)
             }.getMessage
-            validateErrorMessage(msg)
+            validateErrorMessage(msg, format)
           }
 
           // read path
@@ -407,14 +419,14 @@ class FileBasedDataSourceSuite extends QueryTest
               
spark.range(1).write.format(format).mode("overwrite").save(tempDir)
               spark.read.schema(schema).format(format).load(tempDir).collect()
             }.getMessage
-            validateErrorMessage(msg)
+            validateErrorMessage(msg, format)
 
             msg = intercept[AnalysisException] {
               val schema = StructType(StructField("a", new IntervalUDT(), 
true) :: Nil)
               
spark.range(1).write.format(format).mode("overwrite").save(tempDir)
               spark.read.schema(schema).format(format).load(tempDir).collect()
             }.getMessage
-            validateErrorMessage(msg)
+            validateErrorMessage(msg, format)
           }
         }
       }
@@ -429,8 +441,8 @@ class FileBasedDataSourceSuite extends QueryTest
       } else {
         ""
       }
-      def errorMessage(format: String): String = {
-        s"$format data source does not support void data type."
+      def errorMessage(format: String, column: String): String = {
+        s"column `$column` has a data type of void, which is not supported by 
$format."
       }
       withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1List) {
         withTempDir { dir =>
@@ -442,14 +454,14 @@ class FileBasedDataSourceSuite extends QueryTest
               sql("select 
null").write.format(format).mode("overwrite").save(tempDir)
             }.getMessage
             assert(msg.toLowerCase(Locale.ROOT)
-              .contains(errorMessage(format)))
+              .contains(errorMessage(format, "null")))
 
             msg = intercept[AnalysisException] {
               spark.udf.register("testType", () => new NullData())
               sql("select 
testType()").write.format(format).mode("overwrite").save(tempDir)
             }.getMessage
             assert(msg.toLowerCase(Locale.ROOT)
-              .contains(errorMessage(format)))
+              .contains(errorMessage(format, "testtype()")))
 
             // read path
             msg = intercept[AnalysisException] {
@@ -458,7 +470,7 @@ class FileBasedDataSourceSuite extends QueryTest
               spark.read.schema(schema).format(format).load(tempDir).collect()
             }.getMessage
             assert(msg.toLowerCase(Locale.ROOT)
-              .contains(errorMessage(format)))
+              .contains(errorMessage(format, "a")))
 
             msg = intercept[AnalysisException] {
               val schema = StructType(StructField("a", new NullUDT(), true) :: 
Nil)
@@ -466,7 +478,7 @@ class FileBasedDataSourceSuite extends QueryTest
               spark.read.schema(schema).format(format).load(tempDir).collect()
             }.getMessage
             assert(msg.toLowerCase(Locale.ROOT)
-              .contains(errorMessage(format)))
+              .contains(errorMessage(format, "a")))
           }
         }
       }
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 363db9badaf..f1bb8d30eed 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -2379,7 +2379,7 @@ class HiveDDLSuite
     withTable("t1", "t2", "t3") {
       assertAnalysisError(
         "CREATE TABLE t1 USING PARQUET AS SELECT NULL AS null_col",
-        "Parquet data source does not support void data type")
+        "Column `null_col` has a data type of void, which is not supported by 
Parquet.")
 
       assertAnalysisError(
         "CREATE TABLE t2 STORED AS PARQUET AS SELECT null as null_col",
@@ -2393,7 +2393,7 @@ class HiveDDLSuite
     withTable("t1", "t2", "t3", "t4") {
       assertAnalysisError(
         "CREATE TABLE t1 (v VOID) USING PARQUET",
-        "Parquet data source does not support void data type")
+        "Column `v` has a data type of void, which is not supported by 
Parquet.")
 
       assertAnalysisError(
         "CREATE TABLE t2 (v VOID) STORED AS PARQUET",
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
index 574281a6577..61e8ede9d3b 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
@@ -112,22 +112,28 @@ class HiveOrcSourceSuite extends OrcSuite with 
TestHiveSingleton {
     withTempDir { dir =>
       val orcDir = new File(dir, "orc").getCanonicalPath
 
+      def validateErrorMessage(msg: String, column: String, dt: String, 
format: String): Unit = {
+        val excepted = s"Column `$column` has a data type of $dt, " +
+          s"which is not supported by $format."
+        assert(msg.contains(excepted))
+      }
+
       // write path
       var msg = intercept[AnalysisException] {
         sql("select interval 1 days").write.mode("overwrite").orc(orcDir)
       }.getMessage
-      assert(msg.contains("ORC data source does not support interval day data 
type"))
+      validateErrorMessage(msg, "INTERVAL '1' DAY", "interval day", "ORC")
 
       msg = intercept[AnalysisException] {
         sql("select null").write.mode("overwrite").orc(orcDir)
       }.getMessage
-      assert(msg.contains("ORC data source does not support void data type."))
+      validateErrorMessage(msg, "NULL", "void", "ORC")
 
       msg = intercept[AnalysisException] {
         spark.udf.register("testType", () => new IntervalData())
         sql("select testType()").write.mode("overwrite").orc(orcDir)
       }.getMessage
-      assert(msg.contains("ORC data source does not support interval data 
type."))
+      validateErrorMessage(msg, "testType()", "interval", "ORC")
 
       // read path
       msg = intercept[AnalysisException] {
@@ -135,14 +141,14 @@ class HiveOrcSourceSuite extends OrcSuite with 
TestHiveSingleton {
         spark.range(1).write.mode("overwrite").orc(orcDir)
         spark.read.schema(schema).orc(orcDir).collect()
       }.getMessage
-      assert(msg.contains("ORC data source does not support interval data 
type."))
+      validateErrorMessage(msg, "a", "interval", "ORC")
 
       msg = intercept[AnalysisException] {
         val schema = StructType(StructField("a", new IntervalUDT(), true) :: 
Nil)
         spark.range(1).write.mode("overwrite").orc(orcDir)
         spark.read.schema(schema).orc(orcDir).collect()
       }.getMessage
-      assert(msg.contains("ORC data source does not support interval data 
type."))
+      validateErrorMessage(msg, "a", "interval", "ORC")
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to