This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 2ebea13 [SPARK-33136][SQL] Fix mistakenly swapped parameter in
V2WriteCommand.outputResolved
2ebea13 is described below
commit 2ebea135a1f4e4cb3187ffc8e77d3f52d3b3a91a
Author: Jungtaek Lim (HeartSaVioR) <[email protected]>
AuthorDate: Wed Oct 14 08:30:03 2020 -0700
[SPARK-33136][SQL] Fix mistakenly swapped parameter in
V2WriteCommand.outputResolved
### What changes were proposed in this pull request?
This PR proposes to fix a bug on calling
`DataType.equalsIgnoreCompatibleNullability` with mistakenly swapped parameters
in `V2WriteCommand.outputResolved`. The order of parameters for
`DataType.equalsIgnoreCompatibleNullability` are `from` and `to`, which says
that the right order of matching variables are `inAttr` and `outAttr`.
### Why are the changes needed?
Spark throws AnalysisException due to unresolved operator in v2 write,
while the operator is unresolved due to a bug that parameters to call
`DataType.equalsIgnoreCompatibleNullability` in `outputResolved` have been
swapped.
### Does this PR introduce _any_ user-facing change?
Yes, end users no longer suffer on unresolved operator in v2 write if
they're trying to write dataframe containing non-nullable complex types against
table matching complex types as nullable.
### How was this patch tested?
New UT added.
Closes #30033 from HeartSaVioR/SPARK-33136.
Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 8e5cb1d276686ec428e4e6aa1c3cfd6bb99e4e9a)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../sql/catalyst/plans/logical/v2Commands.scala | 2 +-
.../apache/spark/sql/DataFrameWriterV2Suite.scala | 87 +++++++++++++++++++++-
2 files changed, 84 insertions(+), 5 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index b4120d9..d2830a9 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -45,7 +45,7 @@ trait V2WriteCommand extends Command {
case (inAttr, outAttr) =>
// names and types must match, nullability must be compatible
inAttr.name == outAttr.name &&
- DataType.equalsIgnoreCompatibleNullability(outAttr.dataType,
inAttr.dataType) &&
+ DataType.equalsIgnoreCompatibleNullability(inAttr.dataType,
outAttr.dataType) &&
(outAttr.nullable || !inAttr.nullable)
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
index 508eefa..ff5c624 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
@@ -23,16 +23,15 @@ import scala.collection.JavaConverters._
import org.scalatest.BeforeAndAfter
-import
org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException,
NoSuchTableException, TableAlreadyExistsException}
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan,
OverwriteByExpression, OverwritePartitionsDynamic}
+import
org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException,
NamedRelation, NoSuchTableException, TableAlreadyExistsException}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan,
OverwriteByExpression, OverwritePartitionsDynamic, V2WriteCommand}
import org.apache.spark.sql.connector.{InMemoryTable, InMemoryTableCatalog}
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.sql.connector.expressions.{BucketTransform,
DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue,
MonthsTransform, YearsTransform}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.test.SharedSparkSession
-import org.apache.spark.sql.types.{IntegerType, LongType, StringType,
StructType}
-import org.apache.spark.sql.types.TimestampType
+import org.apache.spark.sql.types.{ArrayType, DataType, IntegerType, LongType,
MapType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.util.QueryExecutionListener
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
@@ -101,6 +100,86 @@ class DataFrameWriterV2Suite extends QueryTest with
SharedSparkSession with Befo
assert(v2.catalog.exists(_ == catalogPlugin))
}
+ case class FakeV2WriteCommand(table: NamedRelation, query: LogicalPlan)
extends V2WriteCommand
+
+ test("SPARK-33136 output resolved on complex types for V2 write commands") {
+ val tableCatalog = catalog("testcat")
+
+ def assertTypeCompatibility(name: String, fromType: DataType, toType:
DataType): Unit = {
+ val fromTableName = s"from_table_$name"
+ tableCatalog.createTable(
+ Identifier.of(Array(), fromTableName),
+ StructType(Array(StructField("col", fromType))),
+ Array.empty,
+ new java.util.HashMap[String, String]())
+
+ val toTable = tableCatalog.createTable(
+ Identifier.of(Array(), s"to_table_$name"),
+ StructType(Array(StructField("col", toType))),
+ Array.empty,
+ new java.util.HashMap[String, String]())
+
+ val df = spark.table(s"testcat.$fromTableName")
+
+ val relation = DataSourceV2Relation.create(toTable, Some(tableCatalog),
None)
+ val writeCommand = FakeV2WriteCommand(relation,
df.queryExecution.analyzed)
+
+ assert(writeCommand.outputResolved, s"Unable to write from type
$fromType to type $toType.")
+ }
+
+ // The major difference between `from` and `to` is that `from` is a
complex type
+ // with non-nullable, whereas `to` is same data type with flipping
nullable.
+
+ // nested struct type
+ val fromStructType = StructType(Array(
+ StructField("s", StringType),
+ StructField("i_nonnull", IntegerType, nullable = false),
+ StructField("st", StructType(Array(
+ StructField("l", LongType),
+ StructField("s_nonnull", StringType, nullable = false))))))
+
+ val toStructType = StructType(Array(
+ StructField("s", StringType),
+ StructField("i_nonnull", IntegerType),
+ StructField("st", StructType(Array(
+ StructField("l", LongType),
+ StructField("s_nonnull", StringType))))))
+
+ assertTypeCompatibility("struct", fromStructType, toStructType)
+
+ // array type
+ assertTypeCompatibility("array", ArrayType(LongType, containsNull = false),
+ ArrayType(LongType, containsNull = true))
+
+ // array type with struct type
+ val fromArrayWithStructType = ArrayType(
+ StructType(Array(StructField("s", StringType, nullable = false))),
+ containsNull = false)
+
+ val toArrayWithStructType = ArrayType(
+ StructType(Array(StructField("s", StringType))),
+ containsNull = true)
+
+ assertTypeCompatibility("array_struct", fromArrayWithStructType,
toArrayWithStructType)
+
+ // map type
+ assertTypeCompatibility("map", MapType(IntegerType, StringType,
valueContainsNull = false),
+ MapType(IntegerType, StringType, valueContainsNull = true))
+
+ // map type with struct type
+ val fromMapWithStructType = MapType(
+ IntegerType,
+ StructType(Array(StructField("s", StringType, nullable = false))),
+ valueContainsNull = false)
+
+ val toMapWithStructType = MapType(
+ IntegerType,
+ StructType(Array(StructField("s", StringType))),
+ valueContainsNull = true)
+
+ assertTypeCompatibility("map_struct", fromMapWithStructType,
toMapWithStructType)
+ }
+
test("Append: basic append") {
spark.sql("CREATE TABLE testcat.table_name (id bigint, data string) USING
foo")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]