This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 94a6f2afa75 [SPARK-42236][SQL] Refine `NULLABLE_ARRAY_OR_MAP_ELEMENT`
94a6f2afa75 is described below

commit 94a6f2afa758ad375980e92a876158e1900ff53b
Author: itholic <haejoon....@databricks.com>
AuthorDate: Tue Jan 31 19:35:57 2023 +0300

    [SPARK-42236][SQL] Refine `NULLABLE_ARRAY_OR_MAP_ELEMENT`
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to refine `NULLABLE_ARRAY_OR_MAP_ELEMENT` into main-sub 
classes structure.
    
    `NOT_NULL_CONSTRAINT_VIOLATION`
    - `ARRAY_ELEMENT`
    - `MAP_VALUE`
    
    ### Why are the changes needed?
    
    The name of error class is misleading, and we can make this more generic so 
that we reuse for various situation.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Updated & added UTs.
    
    Closes #39804 from itholic/NULLABLE_ARRAY_OR_MAP_ELEMENT.
    
    Authored-by: itholic <haejoon....@databricks.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
    (cherry picked from commit 1cba3b98160ad9d7cdf29e84ff0191598177835c)
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../spark/sql/protobuf/ProtobufDeserializer.scala  |  5 +++--
 core/src/main/resources/error/error-classes.json   | 24 ++++++++++++++++------
 .../plans/logical/basicLogicalOperators.scala      |  4 ++--
 .../spark/sql/errors/QueryCompilationErrors.scala  | 16 +++++++++++++--
 .../apache/spark/sql/DataFrameToSchemaSuite.scala  | 14 ++++++++++++-
 5 files changed, 50 insertions(+), 13 deletions(-)

diff --git 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala
 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala
index 224e22c0f52..37278fab8a3 100644
--- 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala
+++ 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala
@@ -91,7 +91,8 @@ private[sql] class ProtobufDeserializer(
         val element = iterator.next()
         if (element == null) {
           if (!containsNull) {
-            throw 
QueryCompilationErrors.nullableArrayOrMapElementError(protoElementPath)
+            throw 
QueryCompilationErrors.notNullConstraintViolationArrayElementError(
+              protoElementPath)
           } else {
             elementUpdater.setNullAt(i)
           }
@@ -129,7 +130,7 @@ private[sql] class ProtobufDeserializer(
             keyWriter(keyUpdater, i, field.getField(keyField))
             if (field.getField(valueField) == null) {
               if (!valueContainsNull) {
-                throw 
QueryCompilationErrors.nullableArrayOrMapElementError(protoPath)
+                throw 
QueryCompilationErrors.notNullConstraintViolationMapValueError(protoPath)
               } else {
                 valueUpdater.setNullAt(i)
               }
diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index d881e48d604..b70f03b06a6 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -1004,6 +1004,24 @@
       "Operation <operation> is not allowed for <tableIdentWithDB> because it 
is not a partitioned table."
     ]
   },
+  "NOT_NULL_CONSTRAINT_VIOLATION" : {
+    "message" : [
+      "Assigning a NULL is not allowed here."
+    ],
+    "subClass" : {
+      "ARRAY_ELEMENT" : {
+        "message" : [
+          "The array <columnPath> is defined to contain only elements that are 
NOT NULL."
+        ]
+      },
+      "MAP_VALUE" : {
+        "message" : [
+          "The map <columnPath> is defined to contain only values that are NOT 
NULL."
+        ]
+      }
+    },
+    "sqlState" : "42000"
+  },
   "NO_HANDLER_FOR_UDAF" : {
     "message" : [
       "No handler for UDAF '<functionName>'. Use 
sparkSession.udf.register(...) instead."
@@ -1019,12 +1037,6 @@
       "UDF class <className> doesn't implement any UDF interface."
     ]
   },
-  "NULLABLE_ARRAY_OR_MAP_ELEMENT" : {
-    "message" : [
-      "Array or map at <columnPath> contains nullable element while it's 
required to be non-nullable."
-    ],
-    "sqlState" : "42000"
-  },
   "NULLABLE_COLUMN_OR_FIELD" : {
     "message" : [
       "Column or field <name> is nullable while it's required to be 
non-nullable."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index a8dfb8fbd84..74929bf5d79 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -131,7 +131,7 @@ object Project {
 
       case (ArrayType(et, containsNull), expected: ArrayType) =>
         if (containsNull & !expected.containsNull) {
-          throw 
QueryCompilationErrors.nullableArrayOrMapElementError(columnPath)
+          throw 
QueryCompilationErrors.notNullConstraintViolationArrayElementError(columnPath)
         }
         val param = NamedLambdaVariable("x", et, containsNull)
         val reconciledElement = reconcileColumnType(
@@ -141,7 +141,7 @@ object Project {
 
       case (MapType(kt, vt, valueContainsNull), expected: MapType) =>
         if (valueContainsNull & !expected.valueContainsNull) {
-          throw 
QueryCompilationErrors.nullableArrayOrMapElementError(columnPath)
+          throw 
QueryCompilationErrors.notNullConstraintViolationMapValueError(columnPath)
         }
         val keyParam = NamedLambdaVariable("key", kt, nullable = false)
         val valueParam = NamedLambdaVariable("value", vt, valueContainsNull)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index c43806449a8..ff53588a215 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -3180,9 +3180,21 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
       messageParameters = Map("name" -> toSQLId(name)))
   }
 
-  def nullableArrayOrMapElementError(path: Seq[String]): Throwable = {
+  def notNullConstraintViolationArrayElementError(path: Seq[String]): 
Throwable = {
     new AnalysisException(
-      errorClass = "NULLABLE_ARRAY_OR_MAP_ELEMENT",
+      errorClass = "NOT_NULL_CONSTRAINT_VIOLATION.ARRAY_ELEMENT",
+      messageParameters = Map("columnPath" -> toSQLId(path)))
+  }
+
+  def notNullConstraintViolationMapValueError(path: Seq[String]): Throwable = {
+    new AnalysisException(
+      errorClass = "NOT_NULL_CONSTRAINT_VIOLATION.MAP_VALUE",
+      messageParameters = Map("columnPath" -> toSQLId(path)))
+  }
+
+  def notNullConstraintViolationStructFieldError(path: Seq[String]): Throwable 
= {
+    new AnalysisException(
+      errorClass = "NOT_NULL_CONSTRAINT_VIOLATION.STRUCT_FIELD",
       messageParameters = Map("columnPath" -> toSQLId(path)))
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameToSchemaSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameToSchemaSuite.scala
index 6a3401073a0..5bbaebbd9ce 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameToSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameToSchemaSuite.scala
@@ -262,7 +262,7 @@ class DataFrameToSchemaSuite extends QueryTest with 
SharedSparkSession {
     val e = intercept[SparkThrowable](data.to(schema))
     checkError(
       exception = e,
-      errorClass = "NULLABLE_ARRAY_OR_MAP_ELEMENT",
+      errorClass = "NOT_NULL_CONSTRAINT_VIOLATION.ARRAY_ELEMENT",
       parameters = Map("columnPath" -> "`arr`"))
   }
 
@@ -320,4 +320,16 @@ class DataFrameToSchemaSuite extends QueryTest with 
SharedSparkSession {
     assert(df.schema == schema)
     checkAnswer(df, Row(Map("a" -> Row("b", "a"))))
   }
+
+  test("map value: incompatible map nullability") {
+    val m = MapType(StringType, StringType, valueContainsNull = false)
+    val schema = new StructType().add("map", m, nullable = false)
+    val data = Seq("a" -> null).toDF("i", "j").select(map($"i", 
$"j").as("map"))
+    
assert(data.schema.fields(0).dataType.asInstanceOf[MapType].valueContainsNull)
+    val e = intercept[SparkThrowable](data.to(schema))
+    checkError(
+      exception = e,
+      errorClass = "NOT_NULL_CONSTRAINT_VIOLATION.MAP_VALUE",
+      parameters = Map("columnPath" -> "`map`"))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to