This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a45e819da0ce [SPARK-50230][SQL] Added logic to support reading unknown
collation name as utf8_binary
a45e819da0ce is described below
commit a45e819da0ce8fbfcfce64e3a962b95840c1c007
Author: Vladan Vasić <[email protected]>
AuthorDate: Thu Nov 7 18:41:19 2024 +0100
[SPARK-50230][SQL] Added logic to support reading unknown collation name as
utf8_binary
### What changes were proposed in this pull request?
I propose adding a new `SQLConf` entry which enables spark to read an
invalid collation name as `UTF8_BINARY`.
### Why are the changes needed?
These changes are needed in case when spark needs to read a delta table
which has metadata with other convention for naming collations. Instead of
failing, when this conf is enabled, spark would return the `UTF8_BINARY`
collation.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
This patch was tested by adding tests in `DataTypeSuite`.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #48760 from
vladanvasi-db/vladanvasi-db/unknown-collation-name-enablement.
Authored-by: Vladan Vasić <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
---
.../org/apache/spark/sql/internal/SqlApiConf.scala | 3 +
.../spark/sql/internal/SqlApiConfHelper.scala | 2 +
.../org/apache/spark/sql/types/DataType.scala | 26 +++-
.../org/apache/spark/sql/internal/SQLConf.scala | 11 ++
.../org/apache/spark/sql/types/DataTypeSuite.scala | 162 ++++++++++++++++++++-
5 files changed, 199 insertions(+), 5 deletions(-)
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/internal/SqlApiConf.scala
b/sql/api/src/main/scala/org/apache/spark/sql/internal/SqlApiConf.scala
index 555a56705308..9908021592e1 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/internal/SqlApiConf.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/internal/SqlApiConf.scala
@@ -46,6 +46,7 @@ private[sql] trait SqlApiConf {
def defaultStringType: StringType
def stackTracesInDataFrameContext: Int
def legacyAllowUntypedScalaUDFs: Boolean
+ def allowReadingUnknownCollations: Boolean
}
private[sql] object SqlApiConf {
@@ -58,6 +59,7 @@ private[sql] object SqlApiConf {
SqlApiConfHelper.LOCAL_RELATION_CACHE_THRESHOLD_KEY
}
val DEFAULT_COLLATION: String = SqlApiConfHelper.DEFAULT_COLLATION
+ val ALLOW_READING_UNKNOWN_COLLATIONS: String =
SqlApiConfHelper.ALLOW_READING_UNKNOWN_COLLATIONS
def get: SqlApiConf = SqlApiConfHelper.getConfGetter.get()()
@@ -85,4 +87,5 @@ private[sql] object DefaultSqlApiConf extends SqlApiConf {
override def defaultStringType: StringType = StringType
override def stackTracesInDataFrameContext: Int = 1
override def legacyAllowUntypedScalaUDFs: Boolean = false
+ override def allowReadingUnknownCollations: Boolean = false
}
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/internal/SqlApiConfHelper.scala
b/sql/api/src/main/scala/org/apache/spark/sql/internal/SqlApiConfHelper.scala
index 13ef13e5894e..c8d6f395d450 100644
---
a/sql/api/src/main/scala/org/apache/spark/sql/internal/SqlApiConfHelper.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/internal/SqlApiConfHelper.scala
@@ -33,6 +33,8 @@ private[sql] object SqlApiConfHelper {
val SESSION_LOCAL_TIMEZONE_KEY: String = "spark.sql.session.timeZone"
val LOCAL_RELATION_CACHE_THRESHOLD_KEY: String =
"spark.sql.session.localRelationCacheThreshold"
val DEFAULT_COLLATION: String = "spark.sql.session.collation.default"
+ val ALLOW_READING_UNKNOWN_COLLATIONS: String =
+ "spark.sql.collation.allowReadingUnknownCollations"
val confGetter: AtomicReference[() => SqlApiConf] = {
new AtomicReference[() => SqlApiConf](() => DefaultSqlApiConf)
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/types/DataType.scala
b/sql/api/src/main/scala/org/apache/spark/sql/types/DataType.scala
index 008c9cd07076..0878abbd0a84 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/types/DataType.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/types/DataType.scala
@@ -27,7 +27,7 @@ import org.json4s.JsonAST.JValue
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
-import org.apache.spark.{SparkIllegalArgumentException, SparkThrowable}
+import org.apache.spark.{SparkException, SparkIllegalArgumentException,
SparkThrowable}
import org.apache.spark.annotation.Stable
import org.apache.spark.sql.catalyst.analysis.SqlApiAnalysis
import org.apache.spark.sql.catalyst.parser.DataTypeParser
@@ -340,8 +340,17 @@ object DataType {
fields.collect { case (fieldPath, JString(collation)) =>
collation.split("\\.", 2) match {
case Array(provider: String, collationName: String) =>
- CollationFactory.assertValidProvider(provider)
- fieldPath -> collationName
+ try {
+ CollationFactory.assertValidProvider(provider)
+ fieldPath -> collationName
+ } catch {
+ case e: SparkException
+ if e.getCondition == "COLLATION_INVALID_PROVIDER" &&
+ SqlApiConf.get.allowReadingUnknownCollations =>
+ // If the collation provider is unknown and the config for
reading such
+ // collations is enabled, return the UTF8_BINARY collation.
+ fieldPath -> "UTF8_BINARY"
+ }
}
}.toMap
@@ -350,7 +359,16 @@ object DataType {
}
private def stringTypeWithCollation(collationName: String): StringType = {
- StringType(CollationFactory.collationNameToId(collationName))
+ try {
+ StringType(CollationFactory.collationNameToId(collationName))
+ } catch {
+ case e: SparkException
+ if e.getCondition == "COLLATION_INVALID_NAME" &&
+ SqlApiConf.get.allowReadingUnknownCollations =>
+ // If the collation name is unknown and the config for reading such
collations is enabled,
+ // return the UTF8_BINARY collation.
+ StringType(CollationFactory.UTF8_BINARY_COLLATION_ID)
+ }
}
protected[types] def buildFormattedString(
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 4cebc90f6e2b..82e58b360488 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -770,6 +770,15 @@ object SQLConf {
.booleanConf
.createWithDefault(Utils.isTesting)
+ val ALLOW_READING_UNKNOWN_COLLATIONS =
+ buildConf(SqlApiConfHelper.ALLOW_READING_UNKNOWN_COLLATIONS)
+ .internal()
+ .doc("Enables spark to read unknown collation name as UTF8_BINARY. If
the config is " +
+ "not enabled, when spark encounters an unknown collation name, it will
throw an error.")
+ .version("4.0.0")
+ .booleanConf
+ .createWithDefault(false)
+
val DEFAULT_COLLATION =
buildConf(SqlApiConfHelper.DEFAULT_COLLATION)
.doc("Sets default collation to use for string literals, parameter
markers or the string" +
@@ -5525,6 +5534,8 @@ class SQLConf extends Serializable with Logging with
SqlApiConf {
}
}
+ override def allowReadingUnknownCollations: Boolean =
getConf(ALLOW_READING_UNKNOWN_COLLATIONS)
+
def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)
def adaptiveExecutionLogLevel: String = getConf(ADAPTIVE_EXECUTION_LOG_LEVEL)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
index 3241f031a706..3552beb210a1 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
@@ -23,11 +23,13 @@ import org.json4s.jackson.JsonMethods
import org.apache.spark.{SparkException, SparkFunSuite,
SparkIllegalArgumentException}
import org.apache.spark.sql.catalyst.analysis.{caseInsensitiveResolution,
caseSensitiveResolution}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util.{CollationFactory, StringConcat}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataTypeTestUtils.{dayTimeIntervalTypes,
yearMonthIntervalTypes}
-class DataTypeSuite extends SparkFunSuite {
+class DataTypeSuite extends SparkFunSuite with SQLHelper {
private val UNICODE_COLLATION_ID =
CollationFactory.collationNameToId("UNICODE")
@@ -876,6 +878,90 @@ class DataTypeSuite extends SparkFunSuite {
}
}
+ test("string field with invalid collation name") {
+ val collationProviders = Seq("spark", "icu")
+ collationProviders.foreach { provider =>
+ val json =
+ s"""
+ |{
+ | "type": "struct",
+ | "fields": [
+ | {
+ | "name": "c1",
+ | "type": "string",
+ | "nullable": false,
+ | "metadata": {
+ | "${DataType.COLLATIONS_METADATA_KEY}": {
+ | "c1": "$provider.INVALID"
+ | }
+ | }
+ | }
+ | ]
+ |}
+ |""".stripMargin
+
+ // Check that the exception will be thrown in case of invalid collation
name and
+ // UNKNOWN_COLLATION_NAME config not enabled.
+ checkError(
+ exception = intercept[SparkException] {
+ DataType.fromJson(json)
+ },
+ condition = "COLLATION_INVALID_NAME",
+ parameters = Map(
+ "proposals" -> "id",
+ "collationName" -> "INVALID"))
+
+ // Check that the exception will not be thrown in case of invalid
collation name and
+ // UNKNOWN_COLLATION_NAME enabled, but UTF8_BINARY collation will be
returned.
+ withSQLConf(SQLConf.ALLOW_READING_UNKNOWN_COLLATIONS.key -> "true") {
+ val dataType = DataType.fromJson(json)
+ assert(dataType === StructType(
+ StructField("c1",
StringType(CollationFactory.UTF8_BINARY_COLLATION_ID), false) :: Nil))
+ }
+ }
+ }
+
+ test("string field with invalid collation provider") {
+ val json =
+ s"""
+ |{
+ | "type": "struct",
+ | "fields": [
+ | {
+ | "name": "c1",
+ | "type": "string",
+ | "nullable": false,
+ | "metadata": {
+ | "${DataType.COLLATIONS_METADATA_KEY}": {
+ | "c1": "INVALID.INVALID"
+ | }
+ | }
+ | }
+ | ]
+ |}
+ |""".stripMargin
+
+
+ // Check that the exception will be thrown in case of invalid collation
name and
+ // UNKNOWN_COLLATION_NAME config not enabled.
+ checkError(
+ exception = intercept[SparkException] {
+ DataType.fromJson(json)
+ },
+ condition = "COLLATION_INVALID_PROVIDER",
+ parameters = Map(
+ "supportedProviders" -> "spark, icu",
+ "provider" -> "INVALID"))
+
+ // Check that the exception will not be thrown in case of invalid
collation name and
+ // UNKNOWN_COLLATION_NAME enabled, but UTF8_BINARY collation will be
returned.
+ withSQLConf(SQLConf.ALLOW_READING_UNKNOWN_COLLATIONS.key -> "true") {
+ val dataType = DataType.fromJson(json)
+ assert(dataType === StructType(
+ StructField("c1",
StringType(CollationFactory.UTF8_BINARY_COLLATION_ID), false) :: Nil))
+ }
+ }
+
test("non string field has collation metadata") {
val json =
s"""
@@ -1023,6 +1109,42 @@ class DataTypeSuite extends SparkFunSuite {
assert(parsedWithCollations === ArrayType(StringType(unicodeCollationId)))
}
+ test("parse array type with invalid collation metadata") {
+ val utf8BinaryCollationId = CollationFactory.UTF8_BINARY_COLLATION_ID
+ val arrayJson =
+ s"""
+ |{
+ | "type": "array",
+ | "elementType": "string",
+ | "containsNull": true
+ |}
+ |""".stripMargin
+
+ val collationsMap = Map("element" -> "INVALID")
+
+ // Parse without collations map
+ assert(DataType.parseDataType(JsonMethods.parse(arrayJson)) ===
ArrayType(StringType))
+
+ // Check that the exception will be thrown in case of invalid collation
name and
+ // UNKNOWN_COLLATION_NAME config not enabled.
+ checkError(
+ exception = intercept[SparkException] {
+ DataType.parseDataType(JsonMethods.parse(arrayJson), collationsMap =
collationsMap)
+ },
+ condition = "COLLATION_INVALID_NAME",
+ parameters = Map(
+ "proposals" -> "id",
+ "collationName" -> "INVALID"))
+
+ // Check that the exception will not be thrown in case of invalid
collation name and
+ // UNKNOWN_COLLATION_NAME enabled, but UTF8_BINARY collation will be
returned.
+ withSQLConf(SQLConf.ALLOW_READING_UNKNOWN_COLLATIONS.key -> "true") {
+ val dataType = DataType.parseDataType(
+ JsonMethods.parse(arrayJson), collationsMap = collationsMap)
+ assert(dataType === ArrayType(StringType(utf8BinaryCollationId)))
+ }
+ }
+
test("parse map type with collation metadata") {
val unicodeCollationId = CollationFactory.collationNameToId("UNICODE")
val mapJson =
@@ -1046,6 +1168,44 @@ class DataTypeSuite extends SparkFunSuite {
MapType(StringType(unicodeCollationId), StringType(unicodeCollationId)))
}
+ test("parse map type with invalid collation metadata") {
+ val utf8BinaryCollationId = CollationFactory.UTF8_BINARY_COLLATION_ID
+ val mapJson =
+ s"""
+ |{
+ | "type": "map",
+ | "keyType": "string",
+ | "valueType": "string",
+ | "valueContainsNull": true
+ |}
+ |""".stripMargin
+
+ val collationsMap = Map("key" -> "INVALID", "value" -> "INVALID")
+
+ // Parse without collations map
+ assert(DataType.parseDataType(JsonMethods.parse(mapJson)) ===
MapType(StringType, StringType))
+
+ // Check that the exception will be thrown in case of invalid collation
name and
+ // UNKNOWN_COLLATION_NAME config not enabled.
+ checkError(
+ exception = intercept[SparkException] {
+ DataType.parseDataType(JsonMethods.parse(mapJson), collationsMap =
collationsMap)
+ },
+ condition = "COLLATION_INVALID_NAME",
+ parameters = Map(
+ "proposals" -> "id",
+ "collationName" -> "INVALID"))
+
+ // Check that the exception will not be thrown in case of invalid
collation name and
+ // UNKNOWN_COLLATION_NAME enabled, but UTF8_BINARY collation will be
returned.
+ withSQLConf(SQLConf.ALLOW_READING_UNKNOWN_COLLATIONS.key -> "true") {
+ val dataType = DataType.parseDataType(
+ JsonMethods.parse(mapJson), collationsMap = collationsMap)
+ assert(dataType === MapType(
+ StringType(utf8BinaryCollationId), StringType(utf8BinaryCollationId)))
+ }
+ }
+
test("SPARK-48680: Add CharType and VarcharType to DataTypes JAVA API") {
assert(DataTypes.createCharType(1) === CharType(1))
assert(DataTypes.createVarcharType(100) === VarcharType(100))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]