This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new cb48c0e48ee [SPARK-42997][SQL] TableOutputResolver must use correct 
column paths in error messages for arrays and maps
cb48c0e48ee is described below

commit cb48c0e48eeff2b7b51176d0241491300e5aad6f
Author: aokolnychyi <aokolnyc...@apple.com>
AuthorDate: Mon Apr 3 15:21:07 2023 -0700

    [SPARK-42997][SQL] TableOutputResolver must use correct column paths in 
error messages for arrays and maps
    
    ### What changes were proposed in this pull request?
    
    This PR fixes `TableOutputResolver` to use correct column paths in error 
messages for arrays and maps.
    
    ### Why are the changes needed?
    
    These changes are needed to have accurate error messages when there is a 
type mismatch inside arrays and maps.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    This PR comes with tests.
    
    Closes #40630 from aokolnychyi/spark-42997.
    
    Authored-by: aokolnychyi <aokolnyc...@apple.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../catalyst/analysis/TableOutputResolver.scala    |  18 +-
 .../catalyst/analysis/V2WriteAnalysisSuite.scala   | 224 +++++++++++++++++++++
 2 files changed, 234 insertions(+), 8 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
index 71f4eb2918c..919acbe4117 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
@@ -179,8 +179,9 @@ object TableOutputResolver {
       addError(s"Cannot write nullable elements to array of non-nulls: 
'${colPath.quoted}'")
       None
     } else {
-      val param = NamedLambdaVariable("x", inputType.elementType, 
inputType.containsNull)
-      val fakeAttr = AttributeReference("x", expectedType.elementType, 
expectedType.containsNull)()
+      val param = NamedLambdaVariable("element", inputType.elementType, 
inputType.containsNull)
+      val fakeAttr =
+        AttributeReference("element", expectedType.elementType, 
expectedType.containsNull)()
       val res = reorderColumnsByName(Seq(param), Seq(fakeAttr), conf, 
addError, colPath)
       if (res.length == 1) {
         val func = LambdaFunction(res.head, Seq(param))
@@ -203,16 +204,17 @@ object TableOutputResolver {
       addError(s"Cannot write nullable values to map of non-nulls: 
'${colPath.quoted}'")
       None
     } else {
-      val keyParam = NamedLambdaVariable("k", inputType.keyType, nullable = 
false)
-      val fakeKeyAttr = AttributeReference("k", expectedType.keyType, nullable 
= false)()
+      val keyParam = NamedLambdaVariable("key", inputType.keyType, nullable = 
false)
+      val fakeKeyAttr = AttributeReference("key", expectedType.keyType, 
nullable = false)()
       val resKey = reorderColumnsByName(
-        Seq(keyParam), Seq(fakeKeyAttr), conf, addError, colPath :+ "key")
+        Seq(keyParam), Seq(fakeKeyAttr), conf, addError, colPath)
 
-      val valueParam = NamedLambdaVariable("v", inputType.valueType, 
inputType.valueContainsNull)
+      val valueParam =
+        NamedLambdaVariable("value", inputType.valueType, 
inputType.valueContainsNull)
       val fakeValueAttr =
-        AttributeReference("v", expectedType.valueType, 
expectedType.valueContainsNull)()
+        AttributeReference("value", expectedType.valueType, 
expectedType.valueContainsNull)()
       val resValue = reorderColumnsByName(
-        Seq(valueParam), Seq(fakeValueAttr), conf, addError, colPath :+ 
"value")
+        Seq(valueParam), Seq(fakeValueAttr), conf, addError, colPath)
 
       if (resKey.length == 1 && resValue.length == 1) {
         val keyFunc = LambdaFunction(resKey.head, Seq(keyParam))
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala
index 69cd838cfb2..edcd6ec2368 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala
@@ -634,6 +634,230 @@ abstract class V2WriteAnalysisSuiteBase extends 
AnalysisTest {
     }
   }
 
+  test("SPARK-42997: extra fields in nested struct (byName)") {
+    checkExtraFieldsInNestedStruct(byNameResolution = true)
+  }
+
+  test("SPARK-42997: extra fields in nested struct (byPosition)") {
+    checkExtraFieldsInNestedStruct(byNameResolution = false)
+  }
+
+  private def checkExtraFieldsInNestedStruct(byNameResolution: Boolean): Unit 
= {
+    val table = TestRelation(Seq(
+      $"a".int,
+      $"b".struct($"n1".int, $"n2".struct($"dn1".int, $"dn2".int))))
+    val query = TestRelation(Seq(
+      $"a".int,
+      $"b".struct($"n1".int, $"n2".struct($"dn1".int, $"dn2".int, 
$"dn3".int))))
+
+    val parsedPlan = if (byNameResolution) byName(table, query) else 
byPosition(table, query)
+
+    assertNotResolved(parsedPlan)
+    assertAnalysisError(parsedPlan, Seq(
+      "Cannot write incompatible data to table", "'table-name'",
+      "Cannot write extra fields to struct 'b.n2': 'dn3'"))
+  }
+
+  test("SPARK-42997: extra fields in struct inside array (byName)") {
+    checkExtraFieldsInStructInsideArray(byNameResolution = true)
+  }
+
+  test("SPARK-42997: extra fields in struct inside array (byPosition)") {
+    checkExtraFieldsInStructInsideArray(byNameResolution = false)
+  }
+
+  private def checkExtraFieldsInStructInsideArray(byNameResolution: Boolean): 
Unit = {
+    val table = TestRelation(Seq(
+      $"a".int,
+      $"arr".array(new StructType().add("x", "int").add("y", "int"))))
+    val query = TestRelation(Seq(
+      $"a".int,
+      $"arr".array(new StructType().add("x", "int").add("y", "int").add("z", 
"int"))))
+
+    val parsedPlan = if (byNameResolution) byName(table, query) else 
byPosition(table, query)
+
+    assertNotResolved(parsedPlan)
+    assertAnalysisError(parsedPlan, Seq(
+      "Cannot write incompatible data to table", "'table-name'",
+      "Cannot write extra fields to struct 'arr.element': 'z'"))
+  }
+
+  test("SPARK-42997: extra fields in struct inside map key (byName)") {
+    checkExtraFieldsInStructInsideMapKey(byNameResolution = true)
+  }
+
+  test("SPARK-42997: extra fields in struct inside map key (byPosition)") {
+    checkExtraFieldsInStructInsideMapKey(byNameResolution = false)
+  }
+
+  private def checkExtraFieldsInStructInsideMapKey(byNameResolution: Boolean): 
Unit = {
+    val table = TestRelation(Seq(
+      $"a".int,
+      Symbol("m").map(
+        new StructType().add("x", "int").add("y", "int"),
+        new StructType().add("x", "int").add("y", "int"))))
+    val query = TestRelation(Seq(
+      $"a".int,
+      Symbol("m").map(
+        new StructType().add("x", "int").add("y", "int").add("z", "int"),
+        new StructType().add("x", "int").add("y", "int"))))
+
+    val parsedPlan = if (byNameResolution) byName(table, query) else 
byPosition(table, query)
+
+    assertNotResolved(parsedPlan)
+    assertAnalysisError(parsedPlan, Seq(
+      "Cannot write incompatible data to table", "'table-name'",
+      "Cannot write extra fields to struct 'm.key': 'z'"))
+  }
+
+  test("SPARK-42997: extra fields in struct inside map value (byName)") {
+    checkExtraFieldsInStructInsideMapValue(byNameResolution = true)
+  }
+
+  test("SPARK-42997: extra fields in struct inside map value (byPosition)") {
+    checkExtraFieldsInStructInsideMapValue(byNameResolution = false)
+  }
+
+  private def checkExtraFieldsInStructInsideMapValue(byNameResolution: 
Boolean): Unit = {
+    val table = TestRelation(Seq(
+      $"a".int,
+      Symbol("m").map(
+        new StructType().add("x", "int").add("y", "int"),
+        new StructType().add("x", "int").add("y", "int"))))
+    val query = TestRelation(Seq(
+      $"a".int,
+      Symbol("m").map(
+        new StructType().add("x", "int").add("y", "int"),
+        new StructType().add("x", "int").add("y", "int").add("z", "int"))))
+
+    val parsedPlan = if (byNameResolution) byName(table, query) else 
byPosition(table, query)
+
+    assertNotResolved(parsedPlan)
+    assertAnalysisError(parsedPlan, Seq(
+      "Cannot write incompatible data to table", "'table-name'",
+      "Cannot write extra fields to struct 'm.value': 'z'"))
+  }
+
+  test("SPARK-42997: missing fields in nested struct (byName)") {
+    checkMissingFieldsInNestedStruct(byNameResolution = true)
+  }
+
+  test("SPARK-42997: missing fields in nested struct (byPosition)") {
+    checkMissingFieldsInNestedStruct(byNameResolution = false)
+  }
+
+  private def checkMissingFieldsInNestedStruct(byNameResolution: Boolean): 
Unit = {
+    val table = TestRelation(Seq(
+      $"a".int,
+      $"b".struct($"n1".int, $"n2".struct($"dn1".int, $"dn2".int, 
$"dn3".int))))
+    val query = TestRelation(Seq(
+      $"a".int,
+      $"b".struct($"n1".int, $"n2".struct($"dn1".int, $"dn2".int))))
+
+    val (parsedPlan, expectedErrMsg) = if (byNameResolution) {
+      byName(table, query) -> "Cannot find data for output column 'b.n2.dn3'"
+    } else {
+      byPosition(table, query) -> "Struct 'b.n2' missing fields: 'dn3'"
+    }
+
+    assertNotResolved(parsedPlan)
+    assertAnalysisError(parsedPlan, Seq(
+      "Cannot write incompatible data to table", "'table-name'",
+      expectedErrMsg))
+  }
+
+  test("SPARK-42997: missing fields in struct inside array (byName)") {
+    checkMissingFieldsInStructInsideArray(byNameResolution = true)
+  }
+
+  test("SPARK-42997: missing fields in struct inside array (byPosition)") {
+    checkMissingFieldsInStructInsideArray(byNameResolution = false)
+  }
+
+  private def checkMissingFieldsInStructInsideArray(byNameResolution: 
Boolean): Unit = {
+    val table = TestRelation(Seq(
+      $"a".int,
+      $"arr".array(new StructType().add("x", "int").add("y", "int"))))
+    val query = TestRelation(Seq(
+      $"a".int,
+      $"arr".array(new StructType().add("x", "int"))))
+
+    val (parsedPlan, expectedErrMsg) = if (byNameResolution) {
+      byName(table, query) -> "Cannot find data for output column 
'arr.element.y'"
+    } else {
+      byPosition(table, query) -> "Struct 'arr.element' missing fields: 'y'"
+    }
+
+    assertNotResolved(parsedPlan)
+    assertAnalysisError(parsedPlan, Seq(
+      "Cannot write incompatible data to table", "'table-name'",
+      expectedErrMsg))
+  }
+
+  test("SPARK-42997: missing fields in struct inside map key (byName)") {
+    checkMissingFieldsInStructInsideMapKey(byNameResolution = true)
+  }
+
+  test("SPARK-42997: missing fields in struct inside map key (byPosition)") {
+    checkMissingFieldsInStructInsideMapKey(byNameResolution = false)
+  }
+
+  private def checkMissingFieldsInStructInsideMapKey(byNameResolution: 
Boolean): Unit = {
+    val table = TestRelation(Seq(
+      $"a".int,
+      Symbol("m").map(
+        new StructType().add("x", "int").add("y", "int"),
+        new StructType().add("x", "int").add("y", "int"))))
+    val query = TestRelation(Seq(
+      $"a".int,
+      Symbol("m").map(
+        new StructType().add("x", "int"),
+        new StructType().add("x", "int").add("y", "int"))))
+
+    val (parsedPlan, expectedErrMsg) = if (byNameResolution) {
+      byName(table, query) -> "Cannot find data for output column 'm.key.y'"
+    } else {
+      byPosition(table, query) -> "Struct 'm.key' missing fields: 'y'"
+    }
+
+    assertNotResolved(parsedPlan)
+    assertAnalysisError(parsedPlan, Seq(
+      "Cannot write incompatible data to table", "'table-name'",
+      expectedErrMsg))
+  }
+
+  test("SPARK-42997: missing fields in struct inside map value (byName)") {
+    checkMissingFieldsInStructInsideMapValue(byNameResolution = true)
+  }
+
+  test("SPARK-42997: missing fields in struct inside map value (byPosition)") {
+    checkMissingFieldsInStructInsideMapValue(byNameResolution = false)
+  }
+
+  private def checkMissingFieldsInStructInsideMapValue(byNameResolution: 
Boolean): Unit = {
+    val table = TestRelation(Seq(
+      $"a".int,
+      Symbol("m").map(
+        new StructType().add("x", "int").add("y", "int"),
+        new StructType().add("x", "int").add("y", "int"))))
+    val query = TestRelation(Seq(
+      $"a".int,
+      Symbol("m").map(
+        new StructType().add("x", "int").add("y", "int"),
+        new StructType().add("x", "int"))))
+
+    val (parsedPlan, expectedErrMsg) = if (byNameResolution) {
+      byName(table, query) -> "Cannot find data for output column 'm.value.y'"
+    } else {
+      byPosition(table, query) -> "Struct 'm.value' missing fields: 'y'"
+    }
+
+    assertNotResolved(parsedPlan)
+    assertAnalysisError(parsedPlan, Seq(
+      "Cannot write incompatible data to table", "'table-name'",
+      expectedErrMsg))
+  }
+
   def assertNotResolved(logicalPlan: LogicalPlan): Unit = {
     assert(!logicalPlan.resolved, s"Plan should not be resolved: $logicalPlan")
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to