[
https://issues.apache.org/jira/browse/SPARK-25943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16674924#comment-16674924
]
James Baker edited comment on SPARK-25943 at 11/5/18 10:43 AM:
---------------------------------------------------------------
There's a thing where at the top level it's easy to fix this by reordering the
columns, but Spark doesn't currently make it easy to permute the column names
of a struct.
As a snippet of how I could fix it:
{code:java}
private def reorderStructFields(expected: DataType, actual: Expression):
Expression = {
if (expected.equals(actual.dataType)) {
actual
} else {
expected match {
case ArrayType(elementType, containsNull) =>
val lambdaVariable = NamedLambdaVariable.apply("x", elementType,
containsNull)
val reordered = reorderStructFields(elementType, lambdaVariable)
if (reordered.equals(lambdaVariable)) {
actual
} else {
If(
IsNull(actual),
Cast(Literal(null), expected),
ArrayTransform(actual, LambdaFunction(reordered,
Seq(lambdaVariable))))
}
case StructType(elementTypes) =>
If(
IsNull(actual),
Cast(Literal(null), expected),
CreateStruct(
elementTypes.toSeq.map(structField => Alias(reorderStructFields(
structField.dataType,
GetStructField(
actual,
findStructIndex(actual.dataType, structField.name),
Option(structField.name))),
structField.name)(explicitMetadata =
Option(structField.metadata)))))
case _ => actual
}
}
}
private def findStructIndex(actual: DataType, column: String): Int = {
actual match {
case StructType(elementTypes) => elementTypes.map(field =>
field.name).indexOf(column)
}
}
{code}
but this clearly isn't ideal.
It's probably reasonable to just throw.
was (Author: jebaker):
I think this also conflicts with
[https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala#L369|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala#L369;].
There's a thing where at the top level it's easy to fix this by reordering the
columns, but Spark doesn't currently make it easy to permute the column names
of a struct.
As a snippet of how I could fix it:
{code:java}
private def reorderStructFields(expected: DataType, actual: Expression):
Expression = {
if (expected.equals(actual.dataType)) {
actual
} else {
expected match {
case ArrayType(elementType, containsNull) =>
val lambdaVariable = NamedLambdaVariable.apply("x", elementType,
containsNull)
val reordered = reorderStructFields(elementType, lambdaVariable)
if (reordered.equals(lambdaVariable)) {
actual
} else {
If(
IsNull(actual),
Cast(Literal(null), expected),
ArrayTransform(actual, LambdaFunction(reordered,
Seq(lambdaVariable))))
}
case StructType(elementTypes) =>
If(
IsNull(actual),
Cast(Literal(null), expected),
CreateStruct(
elementTypes.toSeq.map(structField => Alias(reorderStructFields(
structField.dataType,
GetStructField(
actual,
findStructIndex(actual.dataType, structField.name),
Option(structField.name))),
structField.name)(explicitMetadata =
Option(structField.metadata)))))
case _ => actual
}
}
}
private def findStructIndex(actual: DataType, column: String): Int = {
actual match {
case StructType(elementTypes) => elementTypes.map(field =>
field.name).indexOf(column)
}
}
{code}
but this clearly isn't ideal.
It's probably reasonable to just throw.
> Corruption when writing data into a catalog table with a different struct
> schema
> --------------------------------------------------------------------------------
>
> Key: SPARK-25943
> URL: https://issues.apache.org/jira/browse/SPARK-25943
> Project: Spark
> Issue Type: Bug
> Components: Optimizer, SQL
> Affects Versions: 2.3.2, 2.4.1, 2.5.0, 3.0.0
> Reporter: James Baker
> Priority: Major
>
> Suppose I have a catalog table with schema StructType(Seq(StructField("a",
> StructType(Seq(StructField("b", DataTypes.StringType), StructField("c",
> DataTypes.StringType))).
> Suppose I now try to append a record to it:
> {code:java}
> {"a": {"c": "data1", "b": "data2"}}
> {code}
> That record will actually be appended as:
> {code:java}
> {"a": {"b": "data1", "c": "data2"}}
> {code}
> which is obviously not close to what the user wanted or expected (for me it
> silently corrupted my data).
> It turns out that the user could provide a totally different record,
> {code:java}
> {"a": {"this column": "is totally different", "but": "the types match up"}}
> {code}
> and it'd still get written out, but as
> {code:java}
> {"a": {"b": "is totally different", "c": "the types match up"}}
> {code}
> This is because [in
> DDLPreprocessingUtils.castAndRenameOutput|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala#L500]
>
> [,|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala#L500],]
> Spark puts effort in to reordering column names in line with what the output
> expects, but merely casts any other types. This works nicely in a case where
> you try to e.g. write an int into a double field, but goes wrong on complex
> datatypes if the types match up but the field names do not.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]