This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a45e819da0ce [SPARK-50230][SQL] Added logic to support reading unknown 
collation name as utf8_binary
a45e819da0ce is described below

commit a45e819da0ce8fbfcfce64e3a962b95840c1c007
Author: Vladan Vasić <[email protected]>
AuthorDate: Thu Nov 7 18:41:19 2024 +0100

    [SPARK-50230][SQL] Added logic to support reading unknown collation name as 
utf8_binary
    
    ### What changes were proposed in this pull request?
    
    I propose adding a new `SQLConf` entry which enables spark to read an 
invalid collation name as `UTF8_BINARY`.
    
    ### Why are the changes needed?
    
    These changes are needed in case when spark needs to read a delta table 
which has metadata with other convention for naming collations. Instead of 
failing, when this conf is enabled, spark would return the `UTF8_BINARY` 
collation.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    This patch was tested by adding tests in `DataTypeSuite`.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #48760 from 
vladanvasi-db/vladanvasi-db/unknown-collation-name-enablement.
    
    Authored-by: Vladan Vasić <[email protected]>
    Signed-off-by: Max Gekk <[email protected]>
---
 .../org/apache/spark/sql/internal/SqlApiConf.scala |   3 +
 .../spark/sql/internal/SqlApiConfHelper.scala      |   2 +
 .../org/apache/spark/sql/types/DataType.scala      |  26 +++-
 .../org/apache/spark/sql/internal/SQLConf.scala    |  11 ++
 .../org/apache/spark/sql/types/DataTypeSuite.scala | 162 ++++++++++++++++++++-
 5 files changed, 199 insertions(+), 5 deletions(-)

diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/internal/SqlApiConf.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/internal/SqlApiConf.scala
index 555a56705308..9908021592e1 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/internal/SqlApiConf.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/internal/SqlApiConf.scala
@@ -46,6 +46,7 @@ private[sql] trait SqlApiConf {
   def defaultStringType: StringType
   def stackTracesInDataFrameContext: Int
   def legacyAllowUntypedScalaUDFs: Boolean
+  def allowReadingUnknownCollations: Boolean
 }
 
 private[sql] object SqlApiConf {
@@ -58,6 +59,7 @@ private[sql] object SqlApiConf {
     SqlApiConfHelper.LOCAL_RELATION_CACHE_THRESHOLD_KEY
   }
   val DEFAULT_COLLATION: String = SqlApiConfHelper.DEFAULT_COLLATION
+  val ALLOW_READING_UNKNOWN_COLLATIONS: String = 
SqlApiConfHelper.ALLOW_READING_UNKNOWN_COLLATIONS
 
   def get: SqlApiConf = SqlApiConfHelper.getConfGetter.get()()
 
@@ -85,4 +87,5 @@ private[sql] object DefaultSqlApiConf extends SqlApiConf {
   override def defaultStringType: StringType = StringType
   override def stackTracesInDataFrameContext: Int = 1
   override def legacyAllowUntypedScalaUDFs: Boolean = false
+  override def allowReadingUnknownCollations: Boolean = false
 }
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/internal/SqlApiConfHelper.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/internal/SqlApiConfHelper.scala
index 13ef13e5894e..c8d6f395d450 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/internal/SqlApiConfHelper.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/internal/SqlApiConfHelper.scala
@@ -33,6 +33,8 @@ private[sql] object SqlApiConfHelper {
   val SESSION_LOCAL_TIMEZONE_KEY: String = "spark.sql.session.timeZone"
   val LOCAL_RELATION_CACHE_THRESHOLD_KEY: String = 
"spark.sql.session.localRelationCacheThreshold"
   val DEFAULT_COLLATION: String = "spark.sql.session.collation.default"
+  val ALLOW_READING_UNKNOWN_COLLATIONS: String =
+    "spark.sql.collation.allowReadingUnknownCollations"
 
   val confGetter: AtomicReference[() => SqlApiConf] = {
     new AtomicReference[() => SqlApiConf](() => DefaultSqlApiConf)
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/types/DataType.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/types/DataType.scala
index 008c9cd07076..0878abbd0a84 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/types/DataType.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/types/DataType.scala
@@ -27,7 +27,7 @@ import org.json4s.JsonAST.JValue
 import org.json4s.JsonDSL._
 import org.json4s.jackson.JsonMethods._
 
-import org.apache.spark.{SparkIllegalArgumentException, SparkThrowable}
+import org.apache.spark.{SparkException, SparkIllegalArgumentException, 
SparkThrowable}
 import org.apache.spark.annotation.Stable
 import org.apache.spark.sql.catalyst.analysis.SqlApiAnalysis
 import org.apache.spark.sql.catalyst.parser.DataTypeParser
@@ -340,8 +340,17 @@ object DataType {
         fields.collect { case (fieldPath, JString(collation)) =>
           collation.split("\\.", 2) match {
             case Array(provider: String, collationName: String) =>
-              CollationFactory.assertValidProvider(provider)
-              fieldPath -> collationName
+              try {
+                CollationFactory.assertValidProvider(provider)
+                fieldPath -> collationName
+              } catch {
+                case e: SparkException
+                    if e.getCondition == "COLLATION_INVALID_PROVIDER" &&
+                      SqlApiConf.get.allowReadingUnknownCollations =>
+                  // If the collation provider is unknown and the config for 
reading such
+                  // collations is enabled, return the UTF8_BINARY collation.
+                  fieldPath -> "UTF8_BINARY"
+              }
           }
         }.toMap
 
@@ -350,7 +359,16 @@ object DataType {
   }
 
   private def stringTypeWithCollation(collationName: String): StringType = {
-    StringType(CollationFactory.collationNameToId(collationName))
+    try {
+      StringType(CollationFactory.collationNameToId(collationName))
+    } catch {
+      case e: SparkException
+          if e.getCondition == "COLLATION_INVALID_NAME" &&
+            SqlApiConf.get.allowReadingUnknownCollations =>
+        // If the collation name is unknown and the config for reading such 
collations is enabled,
+        // return the UTF8_BINARY collation.
+        StringType(CollationFactory.UTF8_BINARY_COLLATION_ID)
+    }
   }
 
   protected[types] def buildFormattedString(
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 4cebc90f6e2b..82e58b360488 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -770,6 +770,15 @@ object SQLConf {
       .booleanConf
       .createWithDefault(Utils.isTesting)
 
+  val ALLOW_READING_UNKNOWN_COLLATIONS =
+    buildConf(SqlApiConfHelper.ALLOW_READING_UNKNOWN_COLLATIONS)
+      .internal()
+      .doc("Enables spark to read unknown collation name as UTF8_BINARY. If 
the config is " +
+        "not enabled, when spark encounters an unknown collation name, it will 
throw an error.")
+      .version("4.0.0")
+      .booleanConf
+      .createWithDefault(false)
+
   val DEFAULT_COLLATION =
     buildConf(SqlApiConfHelper.DEFAULT_COLLATION)
       .doc("Sets default collation to use for string literals, parameter 
markers or the string" +
@@ -5525,6 +5534,8 @@ class SQLConf extends Serializable with Logging with 
SqlApiConf {
     }
   }
 
+  override def allowReadingUnknownCollations: Boolean = 
getConf(ALLOW_READING_UNKNOWN_COLLATIONS)
+
   def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)
 
   def adaptiveExecutionLogLevel: String = getConf(ADAPTIVE_EXECUTION_LOG_LEVEL)
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
index 3241f031a706..3552beb210a1 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
@@ -23,11 +23,13 @@ import org.json4s.jackson.JsonMethods
 import org.apache.spark.{SparkException, SparkFunSuite, 
SparkIllegalArgumentException}
 import org.apache.spark.sql.catalyst.analysis.{caseInsensitiveResolution, 
caseSensitiveResolution}
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.plans.SQLHelper
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.catalyst.util.{CollationFactory, StringConcat}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.DataTypeTestUtils.{dayTimeIntervalTypes, 
yearMonthIntervalTypes}
 
-class DataTypeSuite extends SparkFunSuite {
+class DataTypeSuite extends SparkFunSuite with SQLHelper {
 
   private val UNICODE_COLLATION_ID = 
CollationFactory.collationNameToId("UNICODE")
 
@@ -876,6 +878,90 @@ class DataTypeSuite extends SparkFunSuite {
       }
   }
 
+  test("string field with invalid collation name") {
+    val collationProviders = Seq("spark", "icu")
+    collationProviders.foreach { provider =>
+      val json =
+        s"""
+           |{
+           |  "type": "struct",
+           |  "fields": [
+           |    {
+           |      "name": "c1",
+           |      "type": "string",
+           |      "nullable": false,
+           |      "metadata": {
+           |        "${DataType.COLLATIONS_METADATA_KEY}": {
+           |          "c1": "$provider.INVALID"
+           |        }
+           |      }
+           |    }
+           |  ]
+           |}
+           |""".stripMargin
+
+      // Check that the exception will be thrown in case of invalid collation 
name and
+      // UNKNOWN_COLLATION_NAME config not enabled.
+      checkError(
+        exception = intercept[SparkException] {
+          DataType.fromJson(json)
+        },
+        condition = "COLLATION_INVALID_NAME",
+        parameters = Map(
+          "proposals" -> "id",
+          "collationName" -> "INVALID"))
+
+      // Check that the exception will not be thrown in case of invalid 
collation name and
+      // UNKNOWN_COLLATION_NAME enabled, but UTF8_BINARY collation will be 
returned.
+      withSQLConf(SQLConf.ALLOW_READING_UNKNOWN_COLLATIONS.key -> "true") {
+        val dataType = DataType.fromJson(json)
+        assert(dataType === StructType(
+          StructField("c1", 
StringType(CollationFactory.UTF8_BINARY_COLLATION_ID), false) :: Nil))
+      }
+    }
+  }
+
+  test("string field with invalid collation provider") {
+    val json =
+      s"""
+         |{
+         |  "type": "struct",
+         |  "fields": [
+         |    {
+         |      "name": "c1",
+         |      "type": "string",
+         |      "nullable": false,
+         |      "metadata": {
+         |        "${DataType.COLLATIONS_METADATA_KEY}": {
+         |          "c1": "INVALID.INVALID"
+         |        }
+         |      }
+         |    }
+         |  ]
+         |}
+         |""".stripMargin
+
+
+    // Check that the exception will be thrown in case of invalid collation 
name and
+    // UNKNOWN_COLLATION_NAME config not enabled.
+    checkError(
+      exception = intercept[SparkException] {
+        DataType.fromJson(json)
+      },
+      condition = "COLLATION_INVALID_PROVIDER",
+      parameters = Map(
+        "supportedProviders" -> "spark, icu",
+        "provider" -> "INVALID"))
+
+    // Check that the exception will not be thrown in case of invalid 
collation name and
+    // UNKNOWN_COLLATION_NAME enabled, but UTF8_BINARY collation will be 
returned.
+    withSQLConf(SQLConf.ALLOW_READING_UNKNOWN_COLLATIONS.key -> "true") {
+      val dataType = DataType.fromJson(json)
+      assert(dataType === StructType(
+        StructField("c1", 
StringType(CollationFactory.UTF8_BINARY_COLLATION_ID), false) :: Nil))
+    }
+  }
+
   test("non string field has collation metadata") {
     val json =
       s"""
@@ -1023,6 +1109,42 @@ class DataTypeSuite extends SparkFunSuite {
     assert(parsedWithCollations === ArrayType(StringType(unicodeCollationId)))
   }
 
+  test("parse array type with invalid collation metadata") {
+    val utf8BinaryCollationId = CollationFactory.UTF8_BINARY_COLLATION_ID
+    val arrayJson =
+      s"""
+         |{
+         |  "type": "array",
+         |  "elementType": "string",
+         |  "containsNull": true
+         |}
+         |""".stripMargin
+
+    val collationsMap = Map("element" -> "INVALID")
+
+    // Parse without collations map
+    assert(DataType.parseDataType(JsonMethods.parse(arrayJson)) === 
ArrayType(StringType))
+
+    // Check that the exception will be thrown in case of invalid collation 
name and
+    // UNKNOWN_COLLATION_NAME config not enabled.
+    checkError(
+      exception = intercept[SparkException] {
+        DataType.parseDataType(JsonMethods.parse(arrayJson), collationsMap = 
collationsMap)
+      },
+      condition = "COLLATION_INVALID_NAME",
+      parameters = Map(
+        "proposals" -> "id",
+        "collationName" -> "INVALID"))
+
+    // Check that the exception will not be thrown in case of invalid 
collation name and
+    // UNKNOWN_COLLATION_NAME enabled, but UTF8_BINARY collation will be 
returned.
+    withSQLConf(SQLConf.ALLOW_READING_UNKNOWN_COLLATIONS.key -> "true") {
+      val dataType = DataType.parseDataType(
+        JsonMethods.parse(arrayJson), collationsMap = collationsMap)
+      assert(dataType === ArrayType(StringType(utf8BinaryCollationId)))
+    }
+  }
+
   test("parse map type with collation metadata") {
     val unicodeCollationId = CollationFactory.collationNameToId("UNICODE")
     val mapJson =
@@ -1046,6 +1168,44 @@ class DataTypeSuite extends SparkFunSuite {
       MapType(StringType(unicodeCollationId), StringType(unicodeCollationId)))
   }
 
+  test("parse map type with invalid collation metadata") {
+    val utf8BinaryCollationId = CollationFactory.UTF8_BINARY_COLLATION_ID
+    val mapJson =
+      s"""
+         |{
+         |  "type": "map",
+         |  "keyType": "string",
+         |  "valueType": "string",
+         |  "valueContainsNull": true
+         |}
+         |""".stripMargin
+
+    val collationsMap = Map("key" -> "INVALID", "value" -> "INVALID")
+
+    // Parse without collations map
+    assert(DataType.parseDataType(JsonMethods.parse(mapJson)) === 
MapType(StringType, StringType))
+
+    // Check that the exception will be thrown in case of invalid collation 
name and
+    // UNKNOWN_COLLATION_NAME config not enabled.
+    checkError(
+      exception = intercept[SparkException] {
+        DataType.parseDataType(JsonMethods.parse(mapJson), collationsMap = 
collationsMap)
+      },
+      condition = "COLLATION_INVALID_NAME",
+      parameters = Map(
+        "proposals" -> "id",
+        "collationName" -> "INVALID"))
+
+    // Check that the exception will not be thrown in case of invalid 
collation name and
+    // UNKNOWN_COLLATION_NAME enabled, but UTF8_BINARY collation will be 
returned.
+    withSQLConf(SQLConf.ALLOW_READING_UNKNOWN_COLLATIONS.key -> "true") {
+      val dataType = DataType.parseDataType(
+        JsonMethods.parse(mapJson), collationsMap = collationsMap)
+      assert(dataType === MapType(
+        StringType(utf8BinaryCollationId), StringType(utf8BinaryCollationId)))
+    }
+  }
+
   test("SPARK-48680: Add CharType and VarcharType to DataTypes JAVA API") {
     assert(DataTypes.createCharType(1) === CharType(1))
     assert(DataTypes.createVarcharType(100) === VarcharType(100))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to