This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a08859760a41 [SPARK-50262][SQL] Forbid specification complex types
during altering collation
a08859760a41 is described below
commit a08859760a41bde9642307483c5d282c82d31bff
Author: Alexvsalexvsalex <[email protected]>
AuthorDate: Mon Nov 11 16:32:33 2024 +0800
[SPARK-50262][SQL] Forbid specification complex types during altering
collation
### What changes were proposed in this pull request?
[SPARK-48413](https://issues.apache.org/jira/browse/SPARK-48413) has
brought ability to change collation on table.
So I suggest to shrink the feature to only leaf string types.
### Why are the changes needed?
Right now there is found problem with altering collation when table has
metadata. Altering will be failed because of a metadata mismatch between user's
type (that doesn't have metadata) and existing schema.
People will be able to change any collation still, but they need to use
full column path.
### Does this PR introduce _any_ user-facing change?
Yes, but the changed feature wasn't released yet.
### How was this patch tested?
New test and fix old ones.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #48803 from
Alexvsalexvsalex/SPARK-50262_forbid_complex_types_during_collation_change.
Authored-by: Alexvsalexvsalex <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../org/apache/spark/sql/types/DataType.scala | 21 ----------
.../sql/catalyst/analysis/CheckAnalysis.scala | 10 ++---
.../org/apache/spark/sql/types/DataTypeSuite.scala | 12 +++---
.../org/apache/spark/sql/CollationSuite.scala | 48 ++++++++++++++++++++--
.../spark/sql/execution/command/DDLSuite.scala | 26 +++++++-----
5 files changed, 70 insertions(+), 47 deletions(-)
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/types/DataType.scala
b/sql/api/src/main/scala/org/apache/spark/sql/types/DataType.scala
index 0878abbd0a84..4cf7d8efb96a 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/types/DataType.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/types/DataType.scala
@@ -460,27 +460,6 @@ object DataType {
// String types with possibly different collations are compatible.
case (_: StringType, _: StringType) => true
- case (ArrayType(fromElement, fromContainsNull), ArrayType(toElement,
toContainsNull)) =>
- (fromContainsNull == toContainsNull) &&
- equalsIgnoreCompatibleCollation(fromElement, toElement)
-
- case (
- MapType(fromKey, fromValue, fromContainsNull),
- MapType(toKey, toValue, toContainsNull)) =>
- fromContainsNull == toContainsNull &&
- // Map keys cannot change collation.
- fromKey == toKey &&
- equalsIgnoreCompatibleCollation(fromValue, toValue)
-
- case (StructType(fromFields), StructType(toFields)) =>
- fromFields.length == toFields.length &&
- fromFields.zip(toFields).forall { case (fromField, toField) =>
- fromField.name == toField.name &&
- fromField.nullable == toField.nullable &&
- fromField.metadata == toField.metadata &&
- equalsIgnoreCompatibleCollation(fromField.dataType, toField.dataType)
- }
-
case (fromDataType, toDataType) => fromDataType == toDataType
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 16899b656f30..724014273fed 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -1546,10 +1546,7 @@ trait CheckAnalysis extends PredicateHelper with
LookupCatalog with QueryErrorsB
.map(dt => col.field.copy(dataType = dt))
.getOrElse(col.field)
val newDataType = a.dataType.get
- val sameTypeExceptCollations =
- DataType.equalsIgnoreCompatibleCollation(field.dataType,
newDataType)
newDataType match {
- case _ if sameTypeExceptCollations => // Allow changing type
collations.
case _: StructType => alter.failAnalysis(
"CANNOT_UPDATE_FIELD.STRUCT_TYPE",
Map("table" -> toSQLId(table.name), "fieldName" ->
toSQLId(fieldName)))
@@ -1576,10 +1573,11 @@ trait CheckAnalysis extends PredicateHelper with
LookupCatalog with QueryErrorsB
case (CharType(l1), CharType(l2)) => l1 == l2
case (CharType(l1), VarcharType(l2)) => l1 <= l2
case (VarcharType(l1), VarcharType(l2)) => l1 <= l2
- case _ => Cast.canUpCast(from, to)
+ case _ =>
+ Cast.canUpCast(from, to) ||
+ DataType.equalsIgnoreCompatibleCollation(field.dataType,
newDataType)
}
-
- if (!sameTypeExceptCollations && !canAlterColumnType(field.dataType,
newDataType)) {
+ if (!canAlterColumnType(field.dataType, newDataType)) {
alter.failAnalysis(
errorClass = "NOT_SUPPORTED_CHANGE_COLUMN",
messageParameters = Map(
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
index 3552beb210a1..d5fc4d87bb6a 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
@@ -722,7 +722,7 @@ class DataTypeSuite extends SparkFunSuite with SQLHelper {
checkEqualsIgnoreCompatibleCollation(
ArrayType(StringType),
ArrayType(StringType("UTF8_LCASE")),
- expected = true
+ expected = false
)
checkEqualsIgnoreCompatibleCollation(
ArrayType(StringType),
@@ -732,12 +732,12 @@ class DataTypeSuite extends SparkFunSuite with SQLHelper {
checkEqualsIgnoreCompatibleCollation(
ArrayType(ArrayType(StringType)),
ArrayType(ArrayType(StringType("UTF8_LCASE"))),
- expected = true
+ expected = false
)
checkEqualsIgnoreCompatibleCollation(
MapType(StringType, StringType),
MapType(StringType, StringType("UTF8_LCASE")),
- expected = true
+ expected = false
)
checkEqualsIgnoreCompatibleCollation(
MapType(StringType("UTF8_LCASE"), StringType),
@@ -747,7 +747,7 @@ class DataTypeSuite extends SparkFunSuite with SQLHelper {
checkEqualsIgnoreCompatibleCollation(
MapType(StringType("UTF8_LCASE"), ArrayType(StringType)),
MapType(StringType("UTF8_LCASE"), ArrayType(StringType("UTF8_LCASE"))),
- expected = true
+ expected = false
)
checkEqualsIgnoreCompatibleCollation(
MapType(ArrayType(StringType), IntegerType),
@@ -762,12 +762,12 @@ class DataTypeSuite extends SparkFunSuite with SQLHelper {
checkEqualsIgnoreCompatibleCollation(
StructType(StructField("a", StringType) :: Nil),
StructType(StructField("a", StringType("UTF8_LCASE")) :: Nil),
- expected = true
+ expected = false
)
checkEqualsIgnoreCompatibleCollation(
StructType(StructField("a", ArrayType(StringType)) :: Nil),
StructType(StructField("a", ArrayType(StringType("UTF8_LCASE"))) :: Nil),
- expected = true
+ expected = false
)
checkEqualsIgnoreCompatibleCollation(
StructType(StructField("a", MapType(StringType, IntegerType)) :: Nil),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
index 7d7c95a24ca6..9a47491b0cca 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
@@ -34,7 +34,7 @@ import
org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAg
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.internal.{SqlApiConf, SQLConf}
-import org.apache.spark.sql.types.{ArrayType, MapType, StringType,
StructField, StructType}
+import org.apache.spark.sql.types.{ArrayType, IntegerType, MapType, Metadata,
MetadataBuilder, StringType, StructField, StructType}
class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
protected val v2Source = classOf[FakeV2ProviderWithCustomSchema].getName
@@ -529,15 +529,55 @@ class CollationSuite extends DatasourceV2SQLBase with
AdaptiveSparkPlanHelper {
|""".stripMargin)
sql(s"INSERT INTO $tableName VALUES ('a', array('b'), map(1, 'c'),
struct('d'))")
sql(s"ALTER TABLE $tableName ALTER COLUMN c1 TYPE STRING COLLATE
UTF8_LCASE")
- sql(s"ALTER TABLE $tableName ALTER COLUMN c2 TYPE ARRAY<STRING COLLATE
UNICODE_CI>")
- sql(s"ALTER TABLE $tableName ALTER COLUMN c3 TYPE MAP<INT, STRING
COLLATE UTF8_BINARY>")
- sql(s"ALTER TABLE $tableName ALTER COLUMN c4 TYPE STRUCT<t: STRING
COLLATE UNICODE>")
+ sql(s"ALTER TABLE $tableName ALTER COLUMN c2.element TYPE STRING COLLATE
UNICODE_CI")
+ sql(s"ALTER TABLE $tableName ALTER COLUMN c3.value TYPE STRING COLLATE
UTF8_BINARY")
+ sql(s"ALTER TABLE $tableName ALTER COLUMN c4.t TYPE STRING COLLATE
UNICODE")
checkAnswer(sql(s"SELECT collation(c1), collation(c2[0]), " +
s"collation(c3[1]), collation(c4.t) FROM $tableName"),
Seq(Row("UTF8_LCASE", "UNICODE_CI", "UTF8_BINARY", "UNICODE")))
}
}
+ test("SPARK-50262: Alter column with collation preserve metadata") {
+ def createMetadata(column: String): Metadata =
+ new MetadataBuilder().putString("key", column).build()
+
+ val tableName = "testcat.alter_column_tbl"
+ withTable(tableName) {
+ val df = spark.createDataFrame(
+ java.util.List.of[Row](),
+ StructType(Seq(
+ StructField("c1", StringType, metadata = createMetadata("c1")),
+ StructField("c2", ArrayType(StringType), metadata =
createMetadata("c2")),
+ StructField("c3", MapType(IntegerType, StringType), metadata =
createMetadata("c3")),
+ StructField("c4",
+ StructType(Seq(StructField("t", StringType, metadata =
createMetadata("c4t")))),
+ metadata = createMetadata("c4"))
+ ))
+ )
+ df.write.format("parquet").saveAsTable(tableName)
+
+ sql(s"INSERT INTO $tableName VALUES ('a', array('b'), map(1, 'c'),
struct('d'))")
+ sql(s"ALTER TABLE $tableName ALTER COLUMN c1 TYPE STRING COLLATE
UTF8_LCASE")
+ sql(s"ALTER TABLE $tableName ALTER COLUMN c2.element TYPE STRING COLLATE
UNICODE_CI")
+ sql(s"ALTER TABLE $tableName ALTER COLUMN c3.value TYPE STRING COLLATE
UTF8_BINARY")
+ sql(s"ALTER TABLE $tableName ALTER COLUMN c4.t TYPE STRING COLLATE
UNICODE")
+ val testCatalog = catalog("testcat").asTableCatalog
+ val tableSchema = testCatalog.loadTable(Identifier.of(Array(),
"alter_column_tbl")).schema()
+ val c1Metadata = tableSchema.find(_.name == "c1").get.metadata
+ assert(c1Metadata === createMetadata("c1"))
+ val c2Metadata = tableSchema.find(_.name == "c2").get.metadata
+ assert(c2Metadata === createMetadata("c2"))
+ val c3Metadata = tableSchema.find(_.name == "c3").get.metadata
+ assert(c3Metadata === createMetadata("c3"))
+ val c4Metadata = tableSchema.find(_.name == "c4").get.metadata
+ assert(c4Metadata === createMetadata("c4"))
+ val c4tMetadata = tableSchema.find(_.name == "c4").get.dataType
+ .asInstanceOf[StructType].find(_.name == "t").get.metadata
+ assert(c4tMetadata === createMetadata("c4t"))
+ }
+ }
+
test("SPARK-47210: Implicit casting of collated strings") {
val tableName = "parquet_dummy_implicit_cast_t22"
withTable(tableName) {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index e07f6406901e..fec7183bc75e 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -2344,18 +2344,22 @@ abstract class DDLSuite extends QueryTest with
DDLSuiteBase {
sql("CREATE TABLE t2(col ARRAY<STRING>) USING parquet")
sql("INSERT INTO t2 VALUES (ARRAY('a'))")
checkAnswer(sql("SELECT COLLATION(col[0]) FROM t2"), Row("UTF8_BINARY"))
- sql("ALTER TABLE t2 ALTER COLUMN col TYPE ARRAY<STRING COLLATE
UTF8_LCASE>")
- checkAnswer(sql("SELECT COLLATION(col[0]) FROM t2"), Row("UTF8_LCASE"))
+ assertThrows[AnalysisException] {
+ sql("ALTER TABLE t2 ALTER COLUMN col TYPE ARRAY<STRING COLLATE
UTF8_LCASE>")
+ }
+ checkAnswer(sql("SELECT COLLATION(col[0]) FROM t2"), Row("UTF8_BINARY"))
// `MapType` with collation.
sql("CREATE TABLE t3(col MAP<STRING, STRING>) USING parquet")
sql("INSERT INTO t3 VALUES (MAP('k', 'v'))")
checkAnswer(sql("SELECT COLLATION(col['k']) FROM t3"),
Row("UTF8_BINARY"))
- sql(
- """
- |ALTER TABLE t3 ALTER COLUMN col TYPE
- |MAP<STRING, STRING COLLATE UTF8_LCASE>""".stripMargin)
- checkAnswer(sql("SELECT COLLATION(col['k']) FROM t3"), Row("UTF8_LCASE"))
+ assertThrows[AnalysisException] {
+ sql(
+ """
+ |ALTER TABLE t3 ALTER COLUMN col TYPE
+ |MAP<STRING, STRING COLLATE UTF8_LCASE>""".stripMargin)
+ }
+ checkAnswer(sql("SELECT COLLATION(col['k']) FROM t3"),
Row("UTF8_BINARY"))
// Invalid change of map key collation.
val alterMap =
@@ -2367,7 +2371,7 @@ abstract class DDLSuite extends QueryTest with
DDLSuiteBase {
},
condition = "NOT_SUPPORTED_CHANGE_COLUMN",
parameters = Map(
- "originType" -> "\"MAP<STRING, STRING COLLATE UTF8_LCASE>\"",
+ "originType" -> "\"MAP<STRING, STRING>\"",
"originName" -> "`col`",
"table" -> "`spark_catalog`.`default`.`t3`",
"newType" -> "\"MAP<STRING COLLATE UTF8_LCASE, STRING>\"",
@@ -2380,8 +2384,10 @@ abstract class DDLSuite extends QueryTest with
DDLSuiteBase {
sql("CREATE TABLE t4(col STRUCT<a:STRING>) USING parquet")
sql("INSERT INTO t4 VALUES (NAMED_STRUCT('a', 'value'))")
checkAnswer(sql("SELECT COLLATION(col.a) FROM t4"), Row("UTF8_BINARY"))
- sql("ALTER TABLE t4 ALTER COLUMN col TYPE STRUCT<a:STRING COLLATE
UTF8_LCASE>")
- checkAnswer(sql("SELECT COLLATION(col.a) FROM t4"), Row("UTF8_LCASE"))
+ assertThrows[AnalysisException] {
+ sql("ALTER TABLE t4 ALTER COLUMN col TYPE STRUCT<a:STRING COLLATE
UTF8_LCASE>")
+ }
+ checkAnswer(sql("SELECT COLLATION(col.a) FROM t4"), Row("UTF8_BINARY"))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]