This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ee662c28ffb [SPARK-47431][SQL] Add session level default Collation
6ee662c28ffb is described below

commit 6ee662c28ffb0deb70f08a971f9c1869288d39ba
Author: Mihailo Milosevic <mihailo.milose...@databricks.com>
AuthorDate: Tue Mar 26 20:47:44 2024 +0800

    [SPARK-47431][SQL] Add session level default Collation
    
    ### What changes were proposed in this pull request?
    This PR adds DEFAULT_COLLATION configuration to `SqlApiConf` and makes sure 
literals are created with default collation. This PR also renames misused 
isDefaultCollation in the code.
    
    ### Why are the changes needed?
    These changes are needed to keep clean and consistent code. Also this is 
closely related to casting rules. Default collation is defined as session level 
collation and not as UTF8_BINARY. We had UTF8_BINARY as default previously as 
implementation does not call ICU, but the naming was not relevant.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. Users can define different default collation, but casting in functions 
is still not supported.
    
    ### How was this patch tested?
    Added test to `CollationSuite`.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #45592 from mihailom-db/SPARK-47431.
    
    Authored-by: Mihailo Milosevic <mihailo.milose...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/util/CollationFactory.java  | 44 ++++++++++++++++------
 .../org/apache/spark/unsafe/types/UTF8String.java  | 12 +++---
 .../spark/unsafe/types/CollationFactorySuite.scala |  8 ++--
 .../sql/catalyst/parser/DataTypeAstBuilder.scala   |  2 +-
 .../org/apache/spark/sql/internal/SqlApiConf.scala |  8 +++-
 .../spark/sql/internal/SqlApiConfHelper.scala      |  1 +
 .../org/apache/spark/sql/types/StringType.scala    | 25 +++++++-----
 .../expressions/CollationTypeConstraints.scala     |  6 +--
 .../expressions/codegen/CodeGenerator.scala        |  4 +-
 .../spark/sql/catalyst/expressions/hash.scala      |  4 +-
 .../spark/sql/catalyst/expressions/literals.scala  | 19 +++++-----
 .../catalyst/expressions/stringExpressions.scala   | 12 +++---
 .../sql/catalyst/types/PhysicalDataType.scala      |  5 ++-
 .../spark/sql/catalyst/util/GeneratedColumn.scala  |  4 +-
 .../apache/spark/sql/catalyst/util/TypeUtils.scala |  2 +-
 .../spark/sql/catalyst/util/UnsafeRowUtils.scala   |  2 +-
 .../org/apache/spark/sql/internal/SQLConf.scala    | 20 +++++++++-
 .../org/apache/spark/sql/util/SchemaUtils.scala    |  4 +-
 .../sql/execution/aggregate/HashMapGenerator.scala |  4 +-
 .../spark/sql/execution/columnar/ColumnType.scala  |  3 +-
 .../sql/execution/datasources/BucketingUtils.scala |  2 +-
 .../execution/datasources/DataSourceUtils.scala    |  2 +-
 .../execution/datasources/PartitioningUtils.scala  |  2 +-
 .../org/apache/spark/sql/CollationSuite.scala      | 39 +++++++++++++++++--
 24 files changed, 158 insertions(+), 76 deletions(-)

diff --git 
a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java
 
b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java
index 2940900b974a..119508a37e71 100644
--- 
a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java
+++ 
b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java
@@ -65,10 +65,18 @@ public final class CollationFactory {
     public final BiFunction<UTF8String, UTF8String, Boolean> equalsFunction;
 
     /**
-     * Binary collation implies that UTF8Strings are considered equal only if 
they are
-     * byte for byte equal. All accent or case-insensitive collations are 
considered non-binary.
+     * Support for Binary Equality implies that it is possible to check 
equality on
+     * byte by byte level. This allows for the usage of binaryEquals call on 
UTF8Strings
+     * which is more performant than calls to external ICU library.
      */
-    public final boolean isBinaryCollation;
+    public final boolean supportsBinaryEquality;
+    /**
+     * Support for Binary Ordering implies that it is possible to check 
equality and ordering on
+     * byte by byte level. This allows for the usage of binaryEquals and 
binaryCompare calls on
+     * UTF8Strings which is more performant than calls to external ICU 
library. Support for
+     * Binary Ordering implies support for Binary Equality.
+     */
+    public final boolean supportsBinaryOrdering;
 
     public Collation(
         String collationName,
@@ -76,15 +84,20 @@ public final class CollationFactory {
         Comparator<UTF8String> comparator,
         String version,
         ToLongFunction<UTF8String> hashFunction,
-        boolean isBinaryCollation) {
+        boolean supportsBinaryEquality,
+        boolean supportsBinaryOrdering) {
       this.collationName = collationName;
       this.collator = collator;
       this.comparator = comparator;
       this.version = version;
       this.hashFunction = hashFunction;
-      this.isBinaryCollation = isBinaryCollation;
+      this.supportsBinaryEquality = supportsBinaryEquality;
+      this.supportsBinaryOrdering = supportsBinaryOrdering;
+
+      // De Morgan's Law to check supportsBinaryOrdering => 
supportsBinaryEquality
+      assert(!supportsBinaryOrdering || supportsBinaryEquality);
 
-      if (isBinaryCollation) {
+      if (supportsBinaryEquality) {
         this.equalsFunction = UTF8String::equals;
       } else {
         this.equalsFunction = (s1, s2) -> this.comparator.compare(s1, s2) == 0;
@@ -95,22 +108,27 @@ public final class CollationFactory {
      * Constructor with comparators that are inherited from the given collator.
      */
     public Collation(
-        String collationName, Collator collator, String version, boolean 
isBinaryCollation) {
+        String collationName,
+        Collator collator,
+        String version,
+        boolean supportsBinaryEquality,
+        boolean supportsBinaryOrdering) {
       this(
         collationName,
         collator,
         (s1, s2) -> collator.compare(s1.toString(), s2.toString()),
         version,
         s -> (long)collator.getCollationKey(s.toString()).hashCode(),
-        isBinaryCollation);
+        supportsBinaryEquality,
+        supportsBinaryOrdering);
     }
   }
 
   private static final Collation[] collationTable = new Collation[4];
   private static final HashMap<String, Integer> collationNameToIdMap = new 
HashMap<>();
 
-  public static final int DEFAULT_COLLATION_ID = 0;
-  public static final int LOWERCASE_COLLATION_ID = 1;
+  public static final int UTF8_BINARY_COLLATION_ID = 0;
+  public static final int UTF8_BINARY_LCASE_COLLATION_ID = 1;
 
   static {
     // Binary comparison. This is the default collation.
@@ -122,6 +140,7 @@ public final class CollationFactory {
       UTF8String::binaryCompare,
       "1.0",
       s -> (long)s.hashCode(),
+      true,
       true);
 
     // Case-insensitive UTF8 binary collation.
@@ -132,17 +151,18 @@ public final class CollationFactory {
       (s1, s2) -> s1.toLowerCase().binaryCompare(s2.toLowerCase()),
       "1.0",
       (s) -> (long)s.toLowerCase().hashCode(),
+      false,
       false);
 
     // UNICODE case sensitive comparison (ROOT locale, in ICU).
     collationTable[2] = new Collation(
-      "UNICODE", Collator.getInstance(ULocale.ROOT), "153.120.0.0", true);
+      "UNICODE", Collator.getInstance(ULocale.ROOT), "153.120.0.0", true, 
false);
     collationTable[2].collator.setStrength(Collator.TERTIARY);
     collationTable[2].collator.freeze();
 
     // UNICODE case-insensitive comparison (ROOT locale, in ICU + Secondary 
strength).
     collationTable[3] = new Collation(
-      "UNICODE_CI", Collator.getInstance(ULocale.ROOT), "153.120.0.0", false);
+      "UNICODE_CI", Collator.getInstance(ULocale.ROOT), "153.120.0.0", false, 
false);
     collationTable[3].collator.setStrength(Collator.SECONDARY);
     collationTable[3].collator.freeze();
 
diff --git 
a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java 
b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
index 6abc8385da5a..c5dfb91f06c6 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
@@ -343,10 +343,10 @@ public final class UTF8String implements 
Comparable<UTF8String>, Externalizable,
   }
 
   public boolean contains(final UTF8String substring, int collationId) {
-    if (CollationFactory.fetchCollation(collationId).isBinaryCollation) {
+    if (CollationFactory.fetchCollation(collationId).supportsBinaryEquality) {
       return this.contains(substring);
     }
-    if (collationId == CollationFactory.LOWERCASE_COLLATION_ID) {
+    if (collationId == CollationFactory.UTF8_BINARY_LCASE_COLLATION_ID) {
       return this.toLowerCase().contains(substring.toLowerCase());
     }
     return collatedContains(substring, collationId);
@@ -394,10 +394,10 @@ public final class UTF8String implements 
Comparable<UTF8String>, Externalizable,
   }
 
   public boolean startsWith(final UTF8String prefix, int collationId) {
-    if (CollationFactory.fetchCollation(collationId).isBinaryCollation) {
+    if (CollationFactory.fetchCollation(collationId).supportsBinaryEquality) {
       return this.startsWith(prefix);
     }
-    if (collationId == CollationFactory.LOWERCASE_COLLATION_ID) {
+    if (collationId == CollationFactory.UTF8_BINARY_LCASE_COLLATION_ID) {
       return this.toLowerCase().startsWith(prefix.toLowerCase());
     }
     return matchAt(prefix, 0, collationId);
@@ -408,10 +408,10 @@ public final class UTF8String implements 
Comparable<UTF8String>, Externalizable,
   }
 
   public boolean endsWith(final UTF8String suffix, int collationId) {
-    if (CollationFactory.fetchCollation(collationId).isBinaryCollation) {
+    if (CollationFactory.fetchCollation(collationId).supportsBinaryEquality) {
       return this.endsWith(suffix);
     }
-    if (collationId == CollationFactory.LOWERCASE_COLLATION_ID) {
+    if (collationId == CollationFactory.UTF8_BINARY_LCASE_COLLATION_ID) {
       return this.toLowerCase().endsWith(suffix.toLowerCase());
     }
     return matchAt(suffix, numBytes - suffix.numBytes, collationId);
diff --git 
a/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/CollationFactorySuite.scala
 
b/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/CollationFactorySuite.scala
index 0a9ff7558e3a..768d26bf0e11 100644
--- 
a/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/CollationFactorySuite.scala
+++ 
b/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/CollationFactorySuite.scala
@@ -32,19 +32,19 @@ class CollationFactorySuite extends AnyFunSuite with 
Matchers { // scalastyle:ig
   test("collationId stability") {
     val utf8Binary = fetchCollation(0)
     assert(utf8Binary.collationName == "UTF8_BINARY")
-    assert(utf8Binary.isBinaryCollation)
+    assert(utf8Binary.supportsBinaryEquality)
 
     val utf8BinaryLcase = fetchCollation(1)
     assert(utf8BinaryLcase.collationName == "UTF8_BINARY_LCASE")
-    assert(!utf8BinaryLcase.isBinaryCollation)
+    assert(!utf8BinaryLcase.supportsBinaryEquality)
 
     val unicode = fetchCollation(2)
     assert(unicode.collationName == "UNICODE")
-    assert(unicode.isBinaryCollation);
+    assert(unicode.supportsBinaryEquality);
 
     val unicodeCi = fetchCollation(3)
     assert(unicodeCi.collationName == "UNICODE_CI")
-    assert(!unicodeCi.isBinaryCollation)
+    assert(!unicodeCi.supportsBinaryEquality)
   }
 
   test("fetch invalid collation name") {
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/parser/DataTypeAstBuilder.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/parser/DataTypeAstBuilder.scala
index 5c2df6a6e9d9..3b2bfda9a76a 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/parser/DataTypeAstBuilder.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/parser/DataTypeAstBuilder.scala
@@ -74,7 +74,7 @@ class DataTypeAstBuilder extends 
SqlBaseParserBaseVisitor[AnyRef] {
       case (TIMESTAMP_LTZ, Nil) => TimestampType
       case (STRING, Nil) =>
         typeCtx.children.asScala.toSeq match {
-          case Seq(_) => StringType
+          case Seq(_) => SqlApiConf.get.defaultStringType
           case Seq(_, ctx: CollateClauseContext) =>
             val collationName = visitCollateClause(ctx)
             val collationId = CollationFactory.collationNameToId(collationName)
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/internal/SqlApiConf.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/internal/SqlApiConf.scala
index 5ec72b83837e..99f7f2a11f2e 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/internal/SqlApiConf.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/internal/SqlApiConf.scala
@@ -20,7 +20,7 @@ import java.util.TimeZone
 
 import scala.util.Try
 
-import org.apache.spark.sql.types.{AtomicType, TimestampType}
+import org.apache.spark.sql.types.{AtomicType, StringType, TimestampType}
 import org.apache.spark.util.SparkClassUtils
 
 /**
@@ -43,6 +43,7 @@ private[sql] trait SqlApiConf {
   def datetimeJava8ApiEnabled: Boolean
   def sessionLocalTimeZone: String
   def legacyTimeParserPolicy: LegacyBehaviorPolicy.Value
+  def defaultStringType: StringType
 }
 
 private[sql] object SqlApiConf {
@@ -51,8 +52,10 @@ private[sql] object SqlApiConf {
   val LEGACY_TIME_PARSER_POLICY_KEY: String = 
SqlApiConfHelper.LEGACY_TIME_PARSER_POLICY_KEY
   val CASE_SENSITIVE_KEY: String = SqlApiConfHelper.CASE_SENSITIVE_KEY
   val SESSION_LOCAL_TIMEZONE_KEY: String = 
SqlApiConfHelper.SESSION_LOCAL_TIMEZONE_KEY
-  val LOCAL_RELATION_CACHE_THRESHOLD_KEY: String =
+  val LOCAL_RELATION_CACHE_THRESHOLD_KEY: String = {
     SqlApiConfHelper.LOCAL_RELATION_CACHE_THRESHOLD_KEY
+  }
+  val DEFAULT_COLLATION: String = SqlApiConfHelper.DEFAULT_COLLATION
 
   def get: SqlApiConf = SqlApiConfHelper.getConfGetter.get()()
 
@@ -77,4 +80,5 @@ private[sql] object DefaultSqlApiConf extends SqlApiConf {
   override def datetimeJava8ApiEnabled: Boolean = false
   override def sessionLocalTimeZone: String = TimeZone.getDefault.getID
   override def legacyTimeParserPolicy: LegacyBehaviorPolicy.Value = 
LegacyBehaviorPolicy.EXCEPTION
+  override def defaultStringType: StringType = StringType
 }
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/internal/SqlApiConfHelper.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/internal/SqlApiConfHelper.scala
index 79b6cb9231c5..b7b8e14afb38 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/internal/SqlApiConfHelper.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/internal/SqlApiConfHelper.scala
@@ -32,6 +32,7 @@ private[sql] object SqlApiConfHelper {
   val CASE_SENSITIVE_KEY: String = "spark.sql.caseSensitive"
   val SESSION_LOCAL_TIMEZONE_KEY: String = "spark.sql.session.timeZone"
   val LOCAL_RELATION_CACHE_THRESHOLD_KEY: String = 
"spark.sql.session.localRelationCacheThreshold"
+  val DEFAULT_COLLATION: String = "spark.sql.session.collation.default"
 
   val confGetter: AtomicReference[() => SqlApiConf] = {
     new AtomicReference[() => SqlApiConf](() => DefaultSqlApiConf)
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/types/StringType.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/types/StringType.scala
index 2b88f9a01a73..19e1de915add 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/types/StringType.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/types/StringType.scala
@@ -29,25 +29,32 @@ import org.apache.spark.sql.catalyst.util.CollationFactory
 @Stable
 class StringType private(val collationId: Int) extends AtomicType with 
Serializable {
   /**
-   * Returns whether assigned collation is the default spark collation 
(UTF8_BINARY).
+   * Support for Binary Equality implies that strings are considered equal 
only if
+   * they are byte for byte equal. E.g. all accent or case-insensitive 
collations are considered
+   * non-binary. If this field is true, byte level operations can be used 
against this datatype
+   * (e.g. for equality and hashing).
    */
-  def isDefaultCollation: Boolean = collationId == 
CollationFactory.DEFAULT_COLLATION_ID
+  def supportsBinaryEquality: Boolean =
+    CollationFactory.fetchCollation(collationId).supportsBinaryEquality
+  def isUTF8BinaryLcaseCollation: Boolean =
+    collationId == CollationFactory.UTF8_BINARY_LCASE_COLLATION_ID
 
   /**
-   * Binary collation implies that strings are considered equal only if they 
are
-   * byte for byte equal. E.g. all accent or case-insensitive collations are 
considered non-binary.
-   * If this field is true, byte level operations can be used against this 
datatype (e.g. for
-   * equality and hashing).
+   * Support for Binary Ordering implies that strings are considered equal only
+   * if they are byte for byte equal. E.g. all accent or case-insensitive 
collations are
+   * considered non-binary. Also their ordering does not require calls to ICU 
library, as
+   * it follows spark internal implementation. If this field is true, byte 
level operations
+   * can be used against this datatype (e.g. for equality, hashing and 
ordering).
    */
-  def isBinaryCollation: Boolean = 
CollationFactory.fetchCollation(collationId).isBinaryCollation
-  def isLowercaseCollation: Boolean = collationId == 
CollationFactory.LOWERCASE_COLLATION_ID
+  def supportsBinaryOrdering: Boolean =
+    CollationFactory.fetchCollation(collationId).supportsBinaryOrdering
 
   /**
    * Type name that is shown to the customer.
    * If this is an UTF8_BINARY collation output is `string` due to backwards 
compatibility.
    */
   override def typeName: String =
-    if (isDefaultCollation) "string"
+    if (collationId == 0) "string"
     else s"string collate 
${CollationFactory.fetchCollation(collationId).collationName}"
 
   override def equals(obj: Any): Boolean =
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CollationTypeConstraints.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CollationTypeConstraints.scala
index cd909a45c1ed..8b09f0ccb464 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CollationTypeConstraints.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CollationTypeConstraints.scala
@@ -55,7 +55,7 @@ abstract class StringTypeCollated extends AbstractDataType {
 case object StringTypeBinary extends StringTypeCollated {
   override private[sql] def simpleString: String = "string_binary"
   override private[sql] def acceptsType(other: DataType): Boolean =
-    other.isInstanceOf[StringType] && 
other.asInstanceOf[StringType].isBinaryCollation
+    other.isInstanceOf[StringType] && 
other.asInstanceOf[StringType].supportsBinaryEquality
 }
 
 /**
@@ -64,8 +64,8 @@ case object StringTypeBinary extends StringTypeCollated {
 case object StringTypeBinaryLcase extends StringTypeCollated {
   override private[sql] def simpleString: String = "string_binary_lcase"
   override private[sql] def acceptsType(other: DataType): Boolean =
-    other.isInstanceOf[StringType] && 
(other.asInstanceOf[StringType].isBinaryCollation ||
-      other.asInstanceOf[StringType].isLowercaseCollation)
+    other.isInstanceOf[StringType] && 
(other.asInstanceOf[StringType].supportsBinaryEquality ||
+      other.asInstanceOf[StringType].isUTF8BinaryLcaseCollation)
 }
 
 /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index d9878c873a77..46349a7faf03 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -622,7 +622,7 @@ class CodegenContext extends Logging {
       s"((java.lang.Float.isNaN($c1) && java.lang.Float.isNaN($c2)) || $c1 == 
$c2)"
     case DoubleType =>
       s"((java.lang.Double.isNaN($c1) && java.lang.Double.isNaN($c2)) || $c1 
== $c2)"
-    case st: StringType if st.isDefaultCollation => s"$c1.binaryEquals($c2)"
+    case st: StringType if st.supportsBinaryOrdering => 
s"$c1.binaryEquals($c2)"
     case st: StringType => s"$c1.semanticEquals($c2, ${st.collationId})"
     case dt: DataType if isPrimitiveType(dt) => s"$c1 == $c2"
     case dt: DataType if dt.isInstanceOf[AtomicType] => s"$c1.equals($c2)"
@@ -652,7 +652,7 @@ class CodegenContext extends Logging {
     case FloatType =>
       val clsName = SQLOrderingUtil.getClass.getName.stripSuffix("$")
       s"$clsName.compareFloats($c1, $c2)"
-    case st: StringType if st.isDefaultCollation => s"$c1.binaryCompare($c2)"
+    case st: StringType if st.supportsBinaryOrdering => 
s"$c1.binaryCompare($c2)"
     case st: StringType => s"$c1.semanticCompare($c2, ${st.collationId})"
     case dt: DataType if isPrimitiveType(dt) => s"($c1 > $c2 ? 1 : $c1 < $c2 ? 
-1 : 0)"
     case BinaryType => 
s"org.apache.spark.unsafe.types.ByteArray.compareBinary($c1, $c2)"
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
index 0bd815ef7694..436efa892416 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
@@ -407,7 +407,7 @@ abstract class HashExpression[E] extends Expression {
 
   protected def genHashString(
       ctx: CodegenContext, stringType: StringType, input: String, result: 
String): String = {
-    if (stringType.isBinaryCollation) {
+    if (stringType.supportsBinaryEquality) {
       val baseObject = s"$input.getBaseObject()"
       val baseOffset = s"$input.getBaseOffset()"
       val numBytes = s"$input.numBytes()"
@@ -801,7 +801,7 @@ case class HiveHash(children: Seq[Expression]) extends 
HashExpression[Int] {
 
   override protected def genHashString(
       ctx: CodegenContext, stringType: StringType, input: String, result: 
String): String = {
-    if (stringType.isBinaryCollation) {
+    if (stringType.supportsBinaryEquality) {
       val baseObject = s"$input.getBaseObject()"
       val baseOffset = s"$input.getBaseOffset()"
       val numBytes = s"$input.numBytes()"
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
index eadd4c04f4b3..1b20da0b5cbc 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
@@ -50,7 +50,7 @@ import 
org.apache.spark.sql.catalyst.util.DateTimeUtils.instantToMicros
 import org.apache.spark.sql.catalyst.util.IntervalStringStyles.ANSI_STYLE
 import org.apache.spark.sql.catalyst.util.IntervalUtils.{durationToMicros, 
periodToMonths, toDayTimeIntervalString, toYearMonthIntervalString}
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.{SqlApiConf, SQLConf}
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types._
 import org.apache.spark.util.Utils
@@ -69,10 +69,11 @@ object Literal {
     case f: Float => Literal(f, FloatType)
     case b: Byte => Literal(b, ByteType)
     case s: Short => Literal(s, ShortType)
-    case s: String => Literal(UTF8String.fromString(s), StringType)
-    case s: UTF8String => Literal(s, StringType)
-    case c: Char => Literal(UTF8String.fromString(c.toString), StringType)
-    case ac: Array[Char] => Literal(UTF8String.fromString(String.valueOf(ac)), 
StringType)
+    case s: String => Literal(UTF8String.fromString(s), 
SqlApiConf.get.defaultStringType)
+    case s: UTF8String => Literal(s, SqlApiConf.get.defaultStringType)
+    case c: Char => Literal(UTF8String.fromString(c.toString), 
SqlApiConf.get.defaultStringType)
+    case ac: Array[Char] =>
+      Literal(UTF8String.fromString(String.valueOf(ac)), 
SqlApiConf.get.defaultStringType)
     case b: Boolean => Literal(b, BooleanType)
     case d: BigDecimal =>
       val decimal = Decimal(d)
@@ -130,7 +131,7 @@ object Literal {
     case _ if clz == classOf[Period] => YearMonthIntervalType()
     case _ if clz == classOf[JavaBigDecimal] => DecimalType.SYSTEM_DEFAULT
     case _ if clz == classOf[Array[Byte]] => BinaryType
-    case _ if clz == classOf[Array[Char]] => StringType
+    case _ if clz == classOf[Array[Char]] => SqlApiConf.get.defaultStringType
     case _ if clz == classOf[JavaShort] => ShortType
     case _ if clz == classOf[JavaInteger] => IntegerType
     case _ if clz == classOf[JavaLong] => LongType
@@ -140,7 +141,7 @@ object Literal {
     case _ if clz == classOf[JavaBoolean] => BooleanType
 
     // other scala classes
-    case _ if clz == classOf[String] => StringType
+    case _ if clz == classOf[String] => SqlApiConf.get.defaultStringType
     case _ if clz == classOf[BigInt] => DecimalType.SYSTEM_DEFAULT
     case _ if clz == classOf[BigDecimal] => DecimalType.SYSTEM_DEFAULT
     case _ if clz == classOf[CalendarInterval] => CalendarIntervalType
@@ -320,7 +321,7 @@ object LongLiteral {
  */
 object StringLiteral {
   def unapply(a: Any): Option[String] = a match {
-    case Literal(s: UTF8String, StringType) => Some(s.toString)
+    case Literal(s: UTF8String, _: StringType) => Some(s.toString)
     case _ => None
   }
 }
@@ -484,7 +485,7 @@ case class Literal (value: Any, dataType: DataType) extends 
LeafExpression {
   override def sql: String = (value, dataType) match {
     case (_, NullType | _: ArrayType | _: MapType | _: StructType) if value == 
null => "NULL"
     case _ if value == null => s"CAST(NULL AS ${dataType.sql})"
-    case (v: UTF8String, StringType) =>
+    case (v: UTF8String, _: StringType) =>
       // Escapes all backslashes and single quotes.
       "'" + v.toString.replace("\\", "\\\\").replace("'", "\\'") + "'"
     case (v: Byte, ByteType) => s"${v}Y"
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
index 47a4771e663f..e73dc5f2ee1b 100755
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
@@ -598,14 +598,14 @@ object ContainsExpressionBuilder extends 
StringBinaryPredicateExpressionBuilderB
 
 case class Contains(left: Expression, right: Expression) extends 
StringPredicate {
   override def compare(l: UTF8String, r: UTF8String): Boolean = {
-    if (CollationFactory.fetchCollation(collationId).isBinaryCollation) {
+    if (CollationFactory.fetchCollation(collationId).supportsBinaryEquality) {
       l.contains(r)
     } else {
       l.contains(r, collationId)
     }
   }
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-    if (CollationFactory.fetchCollation(collationId).isBinaryCollation) {
+    if (CollationFactory.fetchCollation(collationId).supportsBinaryEquality) {
       defineCodeGen(ctx, ev, (c1, c2) => s"$c1.contains($c2)")
     } else {
       defineCodeGen(ctx, ev, (c1, c2) => s"$c1.contains($c2, $collationId)")
@@ -645,7 +645,7 @@ object StartsWithExpressionBuilder extends 
StringBinaryPredicateExpressionBuilde
 
 case class StartsWith(left: Expression, right: Expression) extends 
StringPredicate {
   override def compare(l: UTF8String, r: UTF8String): Boolean = {
-    if (CollationFactory.fetchCollation(collationId).isBinaryCollation) {
+    if (CollationFactory.fetchCollation(collationId).supportsBinaryEquality) {
       l.startsWith(r)
     } else {
       l.startsWith(r, collationId)
@@ -653,7 +653,7 @@ case class StartsWith(left: Expression, right: Expression) 
extends StringPredica
   }
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-    if (CollationFactory.fetchCollation(collationId).isBinaryCollation) {
+    if (CollationFactory.fetchCollation(collationId).supportsBinaryEquality) {
       defineCodeGen(ctx, ev, (c1, c2) => s"$c1.startsWith($c2)")
     } else {
       defineCodeGen(ctx, ev, (c1, c2) => s"$c1.startsWith($c2, $collationId)")
@@ -693,7 +693,7 @@ object EndsWithExpressionBuilder extends 
StringBinaryPredicateExpressionBuilderB
 
 case class EndsWith(left: Expression, right: Expression) extends 
StringPredicate {
   override def compare(l: UTF8String, r: UTF8String): Boolean = {
-    if (CollationFactory.fetchCollation(collationId).isBinaryCollation) {
+    if (CollationFactory.fetchCollation(collationId).supportsBinaryEquality) {
       l.endsWith(r)
     } else {
       l.endsWith(r, collationId)
@@ -701,7 +701,7 @@ case class EndsWith(left: Expression, right: Expression) 
extends StringPredicate
   }
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-    if (CollationFactory.fetchCollation(collationId).isBinaryCollation) {
+    if (CollationFactory.fetchCollation(collationId).supportsBinaryEquality) {
       defineCodeGen(ctx, ev, (c1, c2) => s"$c1.endsWith($c2)")
     } else {
       defineCodeGen(ctx, ev, (c1, c2) => s"$c1.endsWith($c2, $collationId)")
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala
index 0b0c36b27e71..c43b81915a70 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala
@@ -23,6 +23,7 @@ import scala.reflect.runtime.universe.typeTag
 import org.apache.spark.sql.catalyst.expressions.{Ascending, BoundReference, 
InterpretedOrdering, SortOrder}
 import org.apache.spark.sql.catalyst.util.{ArrayData, CollationFactory, 
SQLOrderingUtil}
 import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.internal.SqlApiConf
 import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, 
ByteExactNumeric, ByteType, CalendarIntervalType, CharType, DataType, DateType, 
DayTimeIntervalType, Decimal, DecimalExactNumeric, DecimalType, 
DoubleExactNumeric, DoubleType, FloatExactNumeric, FloatType, FractionalType, 
IntegerExactNumeric, IntegerType, IntegralType, LongExactNumeric, LongType, 
MapType, NullType, NumericType, ShortExactNumeric, ShortType, StringType, 
StructField, StructType, TimestampNTZType, Timest [...]
 import org.apache.spark.unsafe.types.{ByteArray, UTF8String, VariantVal}
 import org.apache.spark.util.ArrayImplicits._
@@ -40,8 +41,8 @@ object PhysicalDataType {
     case ShortType => PhysicalShortType
     case IntegerType => PhysicalIntegerType
     case LongType => PhysicalLongType
-    case VarcharType(_) => 
PhysicalStringType(CollationFactory.DEFAULT_COLLATION_ID)
-    case CharType(_) => 
PhysicalStringType(CollationFactory.DEFAULT_COLLATION_ID)
+    case VarcharType(_) => 
PhysicalStringType(SqlApiConf.get.defaultStringType.collationId)
+    case CharType(_) => 
PhysicalStringType(SqlApiConf.get.defaultStringType.collationId)
     case s: StringType => PhysicalStringType(s.collationId)
     case FloatType => PhysicalFloatType
     case DoubleType => PhysicalDoubleType
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala
index 747a0e225a2f..deb817a0cdb7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala
@@ -163,9 +163,9 @@ object GeneratedColumn {
         s"generation expression data type ${analyzed.dataType.simpleString} " +
         s"is incompatible with column data type ${dataType.simpleString}")
     }
-    if (analyzed.exists(e => 
SchemaUtils.hasNonDefaultCollatedString(e.dataType))) {
+    if (analyzed.exists(e => 
SchemaUtils.hasNonBinarySortableCollatedString(e.dataType))) {
       throw unsupportedExpressionError(
-        "generation expression cannot contain non-default collated string 
type")
+        "generation expression cannot contain non-binary orderable collated 
string type")
     }
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala
index f74b136235b0..d2c708b380cf 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala
@@ -107,7 +107,7 @@ object TypeUtils extends QueryErrorsBase {
    */
   def typeWithProperEquals(dataType: DataType): Boolean = dataType match {
     case BinaryType => false
-    case s: StringType => s.isBinaryCollation
+    case s: StringType => s.supportsBinaryEquality
     case _: AtomicType => true
     case _ => false
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
index 0c1ce5ffa8b0..e296b5be6134 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/UnsafeRowUtils.scala
@@ -205,7 +205,7 @@ object UnsafeRowUtils {
    * can lead to rows being semantically equal even though their binary 
representations differ).
    */
   def isBinaryStable(dataType: DataType): Boolean = 
!dataType.existsRecursively {
-    case st: StringType => 
!CollationFactory.fetchCollation(st.collationId).isBinaryCollation
+    case st: StringType => 
!CollationFactory.fetchCollation(st.collationId).supportsBinaryEquality
     case _ => false
   }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 4934aae3e90a..9a5e6b271a15 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -41,10 +41,10 @@ import 
org.apache.spark.sql.catalyst.analysis.{HintErrorLogger, Resolver}
 import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
 import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator
 import org.apache.spark.sql.catalyst.plans.logical.HintErrorHandler
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.{CollationFactory, DateTimeUtils}
 import 
org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
-import org.apache.spark.sql.types.{AtomicType, TimestampNTZType, TimestampType}
+import org.apache.spark.sql.types.{AtomicType, StringType, TimestampNTZType, 
TimestampType}
 import org.apache.spark.storage.{StorageLevel, StorageLevelMapper}
 import org.apache.spark.unsafe.array.ByteArrayMethods
 import org.apache.spark.util.{Utils, VersionUtils}
@@ -757,6 +757,14 @@ object SQLConf {
       .booleanConf
       .createWithDefault(Utils.isTesting)
 
+  val DEFAULT_COLLATION =
+    buildConf(SqlApiConfHelper.DEFAULT_COLLATION)
+      .doc("Sets default collation to use for string literals, parameter 
markers or the string" +
+        " produced by a builtin function such as to_char or CAST")
+      .version("4.0.0")
+      .stringConf
+      .createWithDefault("UTF8_BINARY")
+
   val FETCH_SHUFFLE_BLOCKS_IN_BATCH =
     buildConf("spark.sql.adaptive.fetchShuffleBlocksInBatch")
       .internal()
@@ -5010,6 +5018,14 @@ class SQLConf extends Serializable with Logging with 
SqlApiConf {
 
   def collationEnabled: Boolean = getConf(COLLATION_ENABLED)
 
+  override def defaultStringType: StringType = {
+    if (getConf(DEFAULT_COLLATION).toUpperCase(Locale.ROOT) == "UTF8_BINARY") {
+      StringType
+    } else {
+      
StringType(CollationFactory.collationNameToId(getConf(DEFAULT_COLLATION)))
+    }
+  }
+
   def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)
 
   def adaptiveExecutionLogLevel: String = getConf(ADAPTIVE_EXECUTION_LOG_LEVEL)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala
index db547baf84d2..d459d2dd1227 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala
@@ -297,9 +297,9 @@ private[spark] object SchemaUtils {
   /**
    * Checks if a given data type has a non-default collation string type.
    */
-  def hasNonDefaultCollatedString(dt: DataType): Boolean = {
+  def hasNonBinarySortableCollatedString(dt: DataType): Boolean = {
     dt.existsRecursively {
-      case st: StringType => !st.isDefaultCollation
+      case st: StringType => !st.supportsBinaryOrdering
       case _ => false
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashMapGenerator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashMapGenerator.scala
index daf5888d00c4..45a71b4da728 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashMapGenerator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashMapGenerator.scala
@@ -173,8 +173,8 @@ abstract class HashMapGenerator(
             ${hashBytes(bytes)}
           """
         }
-      case st: StringType if st.isBinaryCollation => 
hashBytes(s"$input.getBytes()")
-      case st: StringType if !st.isBinaryCollation =>
+      case st: StringType if st.supportsBinaryEquality => 
hashBytes(s"$input.getBytes()")
+      case st: StringType if !st.supportsBinaryEquality =>
         hashLong(s"CollationFactory.fetchCollation(${st.collationId})" +
           s".hashFunction.applyAsLong($input)")
       case CalendarIntervalType => hashInt(s"$input.hashCode()")
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
index ccabaf8d8b12..ee1f9b413302 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
@@ -25,7 +25,6 @@ import scala.annotation.tailrec
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.types.{PhysicalArrayType, 
PhysicalBinaryType, PhysicalBooleanType, PhysicalByteType, 
PhysicalCalendarIntervalType, PhysicalDataType, PhysicalDecimalType, 
PhysicalDoubleType, PhysicalFloatType, PhysicalIntegerType, PhysicalLongType, 
PhysicalMapType, PhysicalNullType, PhysicalShortType, PhysicalStringType, 
PhysicalStructType}
-import org.apache.spark.sql.catalyst.util.CollationFactory
 import org.apache.spark.sql.errors.ExecutionErrors
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.Platform
@@ -493,7 +492,7 @@ private[columnar] trait DirectCopyColumnType[JvmType] 
extends ColumnType[JvmType
 }
 
 private[columnar] object STRING
-  extends 
NativeColumnType(PhysicalStringType(CollationFactory.DEFAULT_COLLATION_ID), 8)
+  extends NativeColumnType(PhysicalStringType(StringType.collationId), 8)
     with DirectCopyColumnType[UTF8String] {
 
   override def actualSize(row: InternalRow, ordinal: Int): Int = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala
index c107c01287ec..4fa1e0c1f2c5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala
@@ -54,7 +54,7 @@ object BucketingUtils {
   }
 
   def canBucketOn(dataType: DataType): Boolean = dataType match {
-    case st: StringType => st.isDefaultCollation
+    case st: StringType => st.supportsBinaryOrdering
     case other => true
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
index 5d7cda57b15b..38567c16fd1f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
@@ -291,7 +291,7 @@ object DataSourceUtils extends PredicateHelper {
       case childExpression @ (_: Attribute | _: GetStructField) =>
         // don't push down filters for types with non-default collation
         // as it could lead to incorrect results
-        SchemaUtils.hasNonDefaultCollatedString(childExpression.dataType)
+        
SchemaUtils.hasNonBinarySortableCollatedString(childExpression.dataType)
 
       case _ => false
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 9ab66917c2ce..56cba0e0561d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -579,7 +579,7 @@ object PartitioningUtils extends SQLConfHelper {
   def canPartitionOn(dateType: DataType): Boolean = dateType match {
     // non default collated strings should not be used as partition columns
     // as we cannot implement string collation semantic with directory names
-    case st: StringType => st.isDefaultCollation
+    case st: StringType => st.supportsBinaryOrdering
     case a: AtomicType => !a.isInstanceOf[VariantType]
     case _ => false
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
index f0b51a5b2c19..e4b2dc2aa06a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.errors.DataTypeErrors.toSQLType
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, 
ObjectHashAggregateExec}
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
SortMergeJoinExec}
+import org.apache.spark.sql.internal.SqlApiConf
 import org.apache.spark.sql.types.{MapType, StringType, StructField, 
StructType}
 
 class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
@@ -635,7 +636,8 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
       parameters = Map(
         "fieldName" -> "c2",
         "expressionStr" -> "SUBSTRING(c1, 0, 1)",
-        "reason" -> "generation expression cannot contain non-default collated 
string type"))
+        "reason" ->
+          "generation expression cannot contain non-binary orderable collated 
string type"))
 
     checkError(
       exception = intercept[AnalysisException] {
@@ -652,7 +654,8 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
       parameters = Map(
         "fieldName" -> "c2",
         "expressionStr" -> "LOWER(c1)",
-        "reason" -> "generation expression cannot contain non-default collated 
string type"))
+        "reason" ->
+          "generation expression cannot contain non-binary orderable collated 
string type"))
 
     checkError(
       exception = intercept[AnalysisException] {
@@ -669,7 +672,37 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
       parameters = Map(
         "fieldName" -> "c2",
         "expressionStr" -> "UCASE(struct1.a)",
-        "reason" -> "generation expression cannot contain non-default collated 
string type"))
+        "reason" ->
+          "generation expression cannot contain non-binary orderable collated 
string type"))
+  }
+
+  test("SPARK-47431: Default collation set to UNICODE, literal test") {
+    withSQLConf(SqlApiConf.DEFAULT_COLLATION -> "UNICODE") {
+      checkAnswer(sql(s"SELECT collation('aa')"), Seq(Row("UNICODE")))
+    }
+  }
+
+  test("SPARK-47431: Default collation set to UNICODE, column type test") {
+    withTable("t") {
+      withSQLConf(SqlApiConf.DEFAULT_COLLATION -> "UNICODE") {
+        sql(s"CREATE TABLE t(c1 STRING) USING PARQUET")
+        sql(s"INSERT INTO t VALUES ('a')")
+        checkAnswer(sql(s"SELECT collation(c1) FROM t"), Seq(Row("UNICODE")))
+      }
+    }
+  }
+
+  test("SPARK-47431: Create table with UTF8_BINARY, make sure collation 
persists on read") {
+    withTable("t") {
+      withSQLConf(SqlApiConf.DEFAULT_COLLATION -> "UTF8_BINARY") {
+        sql("CREATE TABLE t(c1 STRING) USING PARQUET")
+        sql("INSERT INTO t VALUES ('a')")
+        checkAnswer(sql("SELECT collation(c1) FROM t"), 
Seq(Row("UTF8_BINARY")))
+      }
+      withSQLConf(SqlApiConf.DEFAULT_COLLATION -> "UNICODE") {
+        checkAnswer(sql("SELECT collation(c1) FROM t"), 
Seq(Row("UTF8_BINARY")))
+      }
+    }
   }
 
   test("Aggregation on complex containing collated strings") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to