This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new cb48c0e48ee [SPARK-42997][SQL] TableOutputResolver must use correct column paths in error messages for arrays and maps cb48c0e48ee is described below commit cb48c0e48eeff2b7b51176d0241491300e5aad6f Author: aokolnychyi <aokolnyc...@apple.com> AuthorDate: Mon Apr 3 15:21:07 2023 -0700 [SPARK-42997][SQL] TableOutputResolver must use correct column paths in error messages for arrays and maps ### What changes were proposed in this pull request? This PR fixes `TableOutputResolver` to use correct column paths in error messages for arrays and maps. ### Why are the changes needed? These changes are needed to have accurate error messages when there is a type mismatch inside arrays and maps. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? This PR comes with tests. Closes #40630 from aokolnychyi/spark-42997. Authored-by: aokolnychyi <aokolnyc...@apple.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../catalyst/analysis/TableOutputResolver.scala | 18 +- .../catalyst/analysis/V2WriteAnalysisSuite.scala | 224 +++++++++++++++++++++ 2 files changed, 234 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala index 71f4eb2918c..919acbe4117 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala @@ -179,8 +179,9 @@ object TableOutputResolver { addError(s"Cannot write nullable elements to array of non-nulls: '${colPath.quoted}'") None } else { - val param = NamedLambdaVariable("x", inputType.elementType, inputType.containsNull) - val fakeAttr = AttributeReference("x", expectedType.elementType, expectedType.containsNull)() + val param = NamedLambdaVariable("element", inputType.elementType, inputType.containsNull) + val fakeAttr = + AttributeReference("element", expectedType.elementType, expectedType.containsNull)() val res = reorderColumnsByName(Seq(param), Seq(fakeAttr), conf, addError, colPath) if (res.length == 1) { val func = LambdaFunction(res.head, Seq(param)) @@ -203,16 +204,17 @@ object TableOutputResolver { addError(s"Cannot write nullable values to map of non-nulls: '${colPath.quoted}'") None } else { - val keyParam = NamedLambdaVariable("k", inputType.keyType, nullable = false) - val fakeKeyAttr = AttributeReference("k", expectedType.keyType, nullable = false)() + val keyParam = NamedLambdaVariable("key", inputType.keyType, nullable = false) + val fakeKeyAttr = AttributeReference("key", expectedType.keyType, nullable = false)() val resKey = reorderColumnsByName( - Seq(keyParam), Seq(fakeKeyAttr), conf, addError, colPath :+ "key") + Seq(keyParam), Seq(fakeKeyAttr), conf, addError, colPath) - val valueParam = NamedLambdaVariable("v", inputType.valueType, inputType.valueContainsNull) + val valueParam = + NamedLambdaVariable("value", inputType.valueType, inputType.valueContainsNull) val fakeValueAttr = - AttributeReference("v", expectedType.valueType, expectedType.valueContainsNull)() + AttributeReference("value", expectedType.valueType, expectedType.valueContainsNull)() val resValue = reorderColumnsByName( - Seq(valueParam), Seq(fakeValueAttr), conf, addError, colPath :+ "value") + Seq(valueParam), Seq(fakeValueAttr), conf, addError, colPath) if (resKey.length == 1 && resValue.length == 1) { val keyFunc = LambdaFunction(resKey.head, Seq(keyParam)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala index 69cd838cfb2..edcd6ec2368 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala @@ -634,6 +634,230 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest { } } + test("SPARK-42997: extra fields in nested struct (byName)") { + checkExtraFieldsInNestedStruct(byNameResolution = true) + } + + test("SPARK-42997: extra fields in nested struct (byPosition)") { + checkExtraFieldsInNestedStruct(byNameResolution = false) + } + + private def checkExtraFieldsInNestedStruct(byNameResolution: Boolean): Unit = { + val table = TestRelation(Seq( + $"a".int, + $"b".struct($"n1".int, $"n2".struct($"dn1".int, $"dn2".int)))) + val query = TestRelation(Seq( + $"a".int, + $"b".struct($"n1".int, $"n2".struct($"dn1".int, $"dn2".int, $"dn3".int)))) + + val parsedPlan = if (byNameResolution) byName(table, query) else byPosition(table, query) + + assertNotResolved(parsedPlan) + assertAnalysisError(parsedPlan, Seq( + "Cannot write incompatible data to table", "'table-name'", + "Cannot write extra fields to struct 'b.n2': 'dn3'")) + } + + test("SPARK-42997: extra fields in struct inside array (byName)") { + checkExtraFieldsInStructInsideArray(byNameResolution = true) + } + + test("SPARK-42997: extra fields in struct inside array (byPosition)") { + checkExtraFieldsInStructInsideArray(byNameResolution = false) + } + + private def checkExtraFieldsInStructInsideArray(byNameResolution: Boolean): Unit = { + val table = TestRelation(Seq( + $"a".int, + $"arr".array(new StructType().add("x", "int").add("y", "int")))) + val query = TestRelation(Seq( + $"a".int, + $"arr".array(new StructType().add("x", "int").add("y", "int").add("z", "int")))) + + val parsedPlan = if (byNameResolution) byName(table, query) else byPosition(table, query) + + assertNotResolved(parsedPlan) + assertAnalysisError(parsedPlan, Seq( + "Cannot write incompatible data to table", "'table-name'", + "Cannot write extra fields to struct 'arr.element': 'z'")) + } + + test("SPARK-42997: extra fields in struct inside map key (byName)") { + checkExtraFieldsInStructInsideMapKey(byNameResolution = true) + } + + test("SPARK-42997: extra fields in struct inside map key (byPosition)") { + checkExtraFieldsInStructInsideMapKey(byNameResolution = false) + } + + private def checkExtraFieldsInStructInsideMapKey(byNameResolution: Boolean): Unit = { + val table = TestRelation(Seq( + $"a".int, + Symbol("m").map( + new StructType().add("x", "int").add("y", "int"), + new StructType().add("x", "int").add("y", "int")))) + val query = TestRelation(Seq( + $"a".int, + Symbol("m").map( + new StructType().add("x", "int").add("y", "int").add("z", "int"), + new StructType().add("x", "int").add("y", "int")))) + + val parsedPlan = if (byNameResolution) byName(table, query) else byPosition(table, query) + + assertNotResolved(parsedPlan) + assertAnalysisError(parsedPlan, Seq( + "Cannot write incompatible data to table", "'table-name'", + "Cannot write extra fields to struct 'm.key': 'z'")) + } + + test("SPARK-42997: extra fields in struct inside map value (byName)") { + checkExtraFieldsInStructInsideMapValue(byNameResolution = true) + } + + test("SPARK-42997: extra fields in struct inside map value (byPosition)") { + checkExtraFieldsInStructInsideMapValue(byNameResolution = false) + } + + private def checkExtraFieldsInStructInsideMapValue(byNameResolution: Boolean): Unit = { + val table = TestRelation(Seq( + $"a".int, + Symbol("m").map( + new StructType().add("x", "int").add("y", "int"), + new StructType().add("x", "int").add("y", "int")))) + val query = TestRelation(Seq( + $"a".int, + Symbol("m").map( + new StructType().add("x", "int").add("y", "int"), + new StructType().add("x", "int").add("y", "int").add("z", "int")))) + + val parsedPlan = if (byNameResolution) byName(table, query) else byPosition(table, query) + + assertNotResolved(parsedPlan) + assertAnalysisError(parsedPlan, Seq( + "Cannot write incompatible data to table", "'table-name'", + "Cannot write extra fields to struct 'm.value': 'z'")) + } + + test("SPARK-42997: missing fields in nested struct (byName)") { + checkMissingFieldsInNestedStruct(byNameResolution = true) + } + + test("SPARK-42997: missing fields in nested struct (byPosition)") { + checkMissingFieldsInNestedStruct(byNameResolution = false) + } + + private def checkMissingFieldsInNestedStruct(byNameResolution: Boolean): Unit = { + val table = TestRelation(Seq( + $"a".int, + $"b".struct($"n1".int, $"n2".struct($"dn1".int, $"dn2".int, $"dn3".int)))) + val query = TestRelation(Seq( + $"a".int, + $"b".struct($"n1".int, $"n2".struct($"dn1".int, $"dn2".int)))) + + val (parsedPlan, expectedErrMsg) = if (byNameResolution) { + byName(table, query) -> "Cannot find data for output column 'b.n2.dn3'" + } else { + byPosition(table, query) -> "Struct 'b.n2' missing fields: 'dn3'" + } + + assertNotResolved(parsedPlan) + assertAnalysisError(parsedPlan, Seq( + "Cannot write incompatible data to table", "'table-name'", + expectedErrMsg)) + } + + test("SPARK-42997: missing fields in struct inside array (byName)") { + checkMissingFieldsInStructInsideArray(byNameResolution = true) + } + + test("SPARK-42997: missing fields in struct inside array (byPosition)") { + checkMissingFieldsInStructInsideArray(byNameResolution = false) + } + + private def checkMissingFieldsInStructInsideArray(byNameResolution: Boolean): Unit = { + val table = TestRelation(Seq( + $"a".int, + $"arr".array(new StructType().add("x", "int").add("y", "int")))) + val query = TestRelation(Seq( + $"a".int, + $"arr".array(new StructType().add("x", "int")))) + + val (parsedPlan, expectedErrMsg) = if (byNameResolution) { + byName(table, query) -> "Cannot find data for output column 'arr.element.y'" + } else { + byPosition(table, query) -> "Struct 'arr.element' missing fields: 'y'" + } + + assertNotResolved(parsedPlan) + assertAnalysisError(parsedPlan, Seq( + "Cannot write incompatible data to table", "'table-name'", + expectedErrMsg)) + } + + test("SPARK-42997: missing fields in struct inside map key (byName)") { + checkMissingFieldsInStructInsideMapKey(byNameResolution = true) + } + + test("SPARK-42997: missing fields in struct inside map key (byPosition)") { + checkMissingFieldsInStructInsideMapKey(byNameResolution = false) + } + + private def checkMissingFieldsInStructInsideMapKey(byNameResolution: Boolean): Unit = { + val table = TestRelation(Seq( + $"a".int, + Symbol("m").map( + new StructType().add("x", "int").add("y", "int"), + new StructType().add("x", "int").add("y", "int")))) + val query = TestRelation(Seq( + $"a".int, + Symbol("m").map( + new StructType().add("x", "int"), + new StructType().add("x", "int").add("y", "int")))) + + val (parsedPlan, expectedErrMsg) = if (byNameResolution) { + byName(table, query) -> "Cannot find data for output column 'm.key.y'" + } else { + byPosition(table, query) -> "Struct 'm.key' missing fields: 'y'" + } + + assertNotResolved(parsedPlan) + assertAnalysisError(parsedPlan, Seq( + "Cannot write incompatible data to table", "'table-name'", + expectedErrMsg)) + } + + test("SPARK-42997: missing fields in struct inside map value (byName)") { + checkMissingFieldsInStructInsideMapValue(byNameResolution = true) + } + + test("SPARK-42997: missing fields in struct inside map value (byPosition)") { + checkMissingFieldsInStructInsideMapValue(byNameResolution = false) + } + + private def checkMissingFieldsInStructInsideMapValue(byNameResolution: Boolean): Unit = { + val table = TestRelation(Seq( + $"a".int, + Symbol("m").map( + new StructType().add("x", "int").add("y", "int"), + new StructType().add("x", "int").add("y", "int")))) + val query = TestRelation(Seq( + $"a".int, + Symbol("m").map( + new StructType().add("x", "int").add("y", "int"), + new StructType().add("x", "int")))) + + val (parsedPlan, expectedErrMsg) = if (byNameResolution) { + byName(table, query) -> "Cannot find data for output column 'm.value.y'" + } else { + byPosition(table, query) -> "Struct 'm.value' missing fields: 'y'" + } + + assertNotResolved(parsedPlan) + assertAnalysisError(parsedPlan, Seq( + "Cannot write incompatible data to table", "'table-name'", + expectedErrMsg)) + } + def assertNotResolved(logicalPlan: LogicalPlan): Unit = { assert(!logicalPlan.resolved, s"Plan should not be resolved: $logicalPlan") } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org