This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 92948e73713f [SPARK-50675][SQL] Table and view level collations support
92948e73713f is described below
commit 92948e73713f6f6629e1610ed0975fa8e619f1a8
Author: Dejan Krakovic <[email protected]>
AuthorDate: Thu Dec 26 17:05:24 2024 +0800
[SPARK-50675][SQL] Table and view level collations support
### What changes were proposed in this pull request?
This change introduces table and view level collations support in Spark
SQL, allowing CREATE TABLE, ALTER TABLE and CREATE VIEW commands to specify
DEFAULT COLLATION to be used. For CREATE commands, this refers to all the
underlying columns added as part of the table/view creation. For ALTER TABLE
command, this refers to only newly created columns in the future, whereas
existing ones are not affected, i.e. their collation remains the same.
The PR has been modelled after the original changes made by stefankandic in
https://github.com/apache/spark/pull/48090, with this PR covering table and
view-level collations, whereas a follow up PR will be made covering
schema-level collations.
This PR is adding/extending the corresponding DDL commands for specifying
table/view level collation, whereas a follow up PR will be created separately
to leverage the table/view collation in order to determine default collations
for input queries of DML commands.
### Why are the changes needed?
From our internal users feedback, many people would like to be able to
specify collation for their objects, instead of each individual columns. This
change adds support for table and view level collations, whereas subsequent
changes will add support for other objects such as schema-level collations.
### Does this PR introduce _any_ user-facing change?
The change follows the agreed additions in syntax for collation support.
The following syntax is now supported (**bold** parts denote additions):
{ { [CREATE OR] REPLACE TABLE | CREATE [EXTERNAL] TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ USING data_source ]
[ table_clauses ]
[ AS query ] }
table_specification
( { column_identifier column_type [ column_properties ] ] } [, ...]
[ , table_constraint ] [...] )
table_clauses
{ OPTIONS clause |
PARTITIONED BY clause |
CLUSTER BY clause |
clustered_by_clause |
LOCATION path [ WITH ( CREDENTIAL credential_name ) ] |
COMMENT table_comment |
TBLPROPERTIES clause |
**DEFAULT COLLATION table_collation_name |**
WITH { ROW FILTER clause } } [...]
CREATE [ OR REPLACE ] [ TEMPORARY ] VIEW [ IF NOT EXISTS ] view_name
[ column_list ]
[ schema_binding |
COMMENT view_comment |
TBLPROPERTIES clause |
**DEFAULT COLLATION collation_name** ] [...]
AS query
ALTER TABLE table_name
{ ADD COLUMN clause |
ALTER COLUMN clause |
DROP COLUMN clause |
RENAME COLUMN clause |
**DEFAULT COLLATION clause | …**
}
### How was this patch tested?
Tests for the new syntax/functionality were added as part of the change.
Also, some of the existing tests were extended/amended to cover the new DEFAULT
COLLATION for table/view objects.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #49084 from dejankrak-db/object-level-collations.
Authored-by: Dejan Krakovic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/catalyst/parser/SqlBaseParser.g4 | 7 +++
.../spark/sql/connector/catalog/TableCatalog.java | 5 ++
.../sql/catalyst/analysis/ResolveTableSpec.scala | 1 +
.../spark/sql/catalyst/catalog/interface.scala | 2 +
.../spark/sql/catalyst/parser/AstBuilder.scala | 54 +++++++++++++++++----
.../plans/logical/v2AlterTableCommands.scala | 12 +++++
.../sql/catalyst/plans/logical/v2Commands.scala | 6 ++-
.../sql/connector/catalog/CatalogV2Util.scala | 5 +-
.../spark/sql/connector/catalog/V1Table.scala | 1 +
.../CreateTablePartitioningValidationSuite.scala | 2 +-
.../spark/sql/catalyst/parser/DDLParserSuite.scala | 55 ++++++++++++++++++++--
.../sql/connect/planner/SparkConnectPlanner.scala | 1 +
.../main/scala/org/apache/spark/sql/Dataset.scala | 1 +
.../catalyst/analysis/ResolveSessionCatalog.scala | 14 ++++--
.../spark/sql/execution/SparkSqlParser.scala | 6 ++-
.../apache/spark/sql/execution/command/views.scala | 5 +-
.../execution/datasources/v2/CacheTableExec.scala | 1 +
.../datasources/v2/ShowCreateTableExec.scala | 7 +++
.../datasources/v2/V2SessionCatalog.scala | 6 ++-
.../apache/spark/sql/internal/CatalogImpl.scala | 1 +
.../spark/sql/internal/DataFrameWriterImpl.scala | 3 ++
.../spark/sql/internal/DataFrameWriterV2Impl.scala | 2 +
.../spark/sql/streaming/DataStreamWriter.scala | 1 +
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 10 +++-
.../connector/V2CommandsCaseSensitivitySuite.scala | 12 +++--
.../AlterTableSetTblPropertiesSuiteBase.scala | 6 +--
.../AlterTableUnsetTblPropertiesSuiteBase.scala | 6 +--
.../execution/command/DDLCommandTestUtils.scala | 6 +++
.../sql/execution/command/DDLParserSuite.scala | 6 +++
.../execution/command/DescribeTableSuiteBase.scala | 25 ++++++++++
.../execution/command/v1/DescribeTableSuite.scala | 2 +
.../spark/sql/hive/client/HiveClientImpl.scala | 5 ++
.../spark/sql/hive/client/HiveClientSuite.scala | 19 ++++++++
33 files changed, 257 insertions(+), 38 deletions(-)
diff --git
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index e743aa2a744f..a5d217486bf2 100644
---
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -236,6 +236,7 @@ statement
| ALTER TABLE identifierReference RECOVER PARTITIONS
#recoverPartitions
| ALTER TABLE identifierReference
(clusterBySpec | CLUSTER BY NONE)
#alterClusterBy
+ | ALTER TABLE identifierReference collationSpec
#alterTableCollation
| DROP TABLE (IF EXISTS)? identifierReference PURGE?
#dropTable
| DROP VIEW (IF EXISTS)? identifierReference
#dropView
| CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)?
@@ -243,6 +244,7 @@ statement
identifierCommentList?
(commentSpec |
schemaBinding |
+ collationSpec |
(PARTITIONED ON identifierList) |
(TBLPROPERTIES propertyList))*
AS query
#createView
@@ -528,6 +530,7 @@ createTableClauses
createFileFormat |
locationSpec |
commentSpec |
+ collationSpec |
(TBLPROPERTIES tableProps=propertyList))*
;
@@ -1232,6 +1235,10 @@ colPosition
: position=FIRST | position=AFTER afterCol=errorCapturingIdentifier
;
+collationSpec
+ : DEFAULT COLLATION collationName=identifier
+ ;
+
collateClause
: COLLATE collationName=multipartIdentifier
;
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
index ba3470f85338..77dbaa7687b4 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
@@ -67,6 +67,11 @@ public interface TableCatalog extends CatalogPlugin {
*/
String PROP_COMMENT = "comment";
+ /**
+ * A reserved property to specify the collation of the table.
+ */
+ String PROP_COLLATION = "collation";
+
/**
* A reserved property to specify the provider of the table.
*/
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala
index cc9979ad4c5e..05158fbee3de 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala
@@ -92,6 +92,7 @@ object ResolveTableSpec extends Rule[LogicalPlan] {
options = newOptions.toMap,
location = u.location,
comment = u.comment,
+ collation = u.collation,
serde = u.serde,
external = u.external)
withNewSpec(newTableSpec)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index dcd1d3137da3..32a90833e2e7 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -350,6 +350,7 @@ case class CatalogTable(
stats: Option[CatalogStatistics] = None,
viewText: Option[String] = None,
comment: Option[String] = None,
+ collation: Option[String] = None,
unsupportedFeatures: Seq[String] = Seq.empty,
tracksPartitionsInCatalog: Boolean = false,
schemaPreservesCase: Boolean = true,
@@ -546,6 +547,7 @@ case class CatalogTable(
provider.foreach(map.put("Provider", _))
bucketSpec.foreach(map ++= _.toLinkedHashMap)
comment.foreach(map.put("Comment", _))
+ collation.foreach(map.put("Collation", _))
if (tableType == CatalogTableType.VIEW) {
viewText.foreach(map.put("View Text", _))
viewOriginalText.foreach(map.put("View Original Text", _))
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index f37879ecd935..aa32cc910051 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, Origin}
import org.apache.spark.sql.catalyst.trees.TreePattern.PARAMETER
import org.apache.spark.sql.catalyst.types.DataTypeUtils
-import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils,
IntervalUtils, SparkParserUtils}
+import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, CollationFactory,
DateTimeUtils, IntervalUtils, SparkParserUtils}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate,
convertSpecialTimestamp, convertSpecialTimestampNTZ, getZoneId, stringToDate,
stringToTimestamp, stringToTimestampWithoutTimeZone}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util,
SupportsNamespaces, TableCatalog, TableWritePrivilege}
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
@@ -3869,6 +3869,16 @@ class AstBuilder extends DataTypeAstBuilder
ctx.asScala.headOption.map(visitCommentSpec)
}
+ protected def visitCollationSpecList(
+ ctx: java.util.List[CollationSpecContext]): Option[String] = {
+ ctx.asScala.headOption.map(visitCollationSpec)
+ }
+
+ override def visitCollationSpec(ctx: CollationSpecContext): String =
withOrigin(ctx) {
+ val collationName = ctx.identifier.getText
+ CollationFactory.fetchCollation(collationName).collationName
+ }
+
/**
* Create a [[BucketSpec]].
*/
@@ -4000,6 +4010,7 @@ class AstBuilder extends DataTypeAstBuilder
* - options
* - location
* - comment
+ * - collation
* - serde
* - clusterBySpec
*
@@ -4008,8 +4019,8 @@ class AstBuilder extends DataTypeAstBuilder
* types like `i INT`, which should be appended to the existing table schema.
*/
type TableClauses = (
- Seq[Transform], Seq[ColumnDefinition], Option[BucketSpec], Map[String,
String],
- OptionList, Option[String], Option[String], Option[SerdeInfo],
Option[ClusterBySpec])
+ Seq[Transform], Seq[ColumnDefinition], Option[BucketSpec], Map[String,
String], OptionList,
+ Option[String], Option[String], Option[String], Option[SerdeInfo],
Option[ClusterBySpec])
/**
* Validate a create table statement and return the [[TableIdentifier]].
@@ -4296,6 +4307,10 @@ class AstBuilder extends DataTypeAstBuilder
throw QueryParsingErrors.cannotCleanReservedTablePropertyError(
PROP_EXTERNAL, ctx, "please use CREATE EXTERNAL TABLE")
case (PROP_EXTERNAL, _) => false
+ case (PROP_COLLATION, _) if !legacyOn =>
+ throw QueryParsingErrors.cannotCleanReservedTablePropertyError(
+ PROP_COLLATION, ctx, "please use the DEFAULT COLLATION clause to
specify it")
+ case (PROP_COLLATION, _) => false
// It's safe to set whatever table comment, so we don't make it a
reserved table property.
case (PROP_COMMENT, _) => true
case (k, _) =>
@@ -4475,6 +4490,7 @@ class AstBuilder extends DataTypeAstBuilder
checkDuplicateClauses(ctx.createFileFormat, "STORED AS/BY", ctx)
checkDuplicateClauses(ctx.rowFormat, "ROW FORMAT", ctx)
checkDuplicateClauses(ctx.commentSpec(), "COMMENT", ctx)
+ checkDuplicateClauses(ctx.collationSpec(), "DEFAULT COLLATION", ctx)
checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
checkDuplicateClauses(ctx.clusterBySpec(), "CLUSTER BY", ctx)
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)
@@ -4493,6 +4509,7 @@ class AstBuilder extends DataTypeAstBuilder
val location = visitLocationSpecList(ctx.locationSpec())
val (cleanedOptions, newLocation) = cleanTableOptions(ctx, options,
location)
val comment = visitCommentSpecList(ctx.commentSpec())
+ val collation = visitCollationSpecList(ctx.collationSpec())
val serdeInfo =
getSerdeInfo(ctx.rowFormat.asScala.toSeq,
ctx.createFileFormat.asScala.toSeq, ctx)
val clusterBySpec =
ctx.clusterBySpec().asScala.headOption.map(visitClusterBySpec)
@@ -4507,7 +4524,7 @@ class AstBuilder extends DataTypeAstBuilder
}
(partTransforms, partCols, bucketSpec, cleanedProperties, cleanedOptions,
newLocation, comment,
- serdeInfo, clusterBySpec)
+ collation, serdeInfo, clusterBySpec)
}
protected def getSerdeInfo(
@@ -4567,6 +4584,7 @@ class AstBuilder extends DataTypeAstBuilder
* ]
* [LOCATION path]
* [COMMENT table_comment]
+ * [DEFAULT COLLATION collation_name]
* [TBLPROPERTIES (property_name=property_value, ...)]
*
* partition_fields:
@@ -4580,8 +4598,8 @@ class AstBuilder extends DataTypeAstBuilder
val columns =
Option(ctx.colDefinitionList()).map(visitColDefinitionList).getOrElse(Nil)
val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
- val (partTransforms, partCols, bucketSpec, properties, options, location,
- comment, serdeInfo, clusterBySpec) =
visitCreateTableClauses(ctx.createTableClauses())
+ val (partTransforms, partCols, bucketSpec, properties, options, location,
comment,
+ collation, serdeInfo, clusterBySpec) =
visitCreateTableClauses(ctx.createTableClauses())
if (provider.isDefined && serdeInfo.isDefined) {
invalidStatement(s"CREATE TABLE ... USING ...
${serdeInfo.get.describe}", ctx)
@@ -4599,7 +4617,7 @@ class AstBuilder extends DataTypeAstBuilder
clusterBySpec.map(_.asTransform)
val tableSpec = UnresolvedTableSpec(properties, provider, options,
location, comment,
- serdeInfo, external)
+ collation, serdeInfo, external)
Option(ctx.query).map(plan) match {
case Some(_) if columns.nonEmpty =>
@@ -4648,6 +4666,7 @@ class AstBuilder extends DataTypeAstBuilder
* ]
* [LOCATION path]
* [COMMENT table_comment]
+ * [DEFAULT COLLATION collation_name]
* [TBLPROPERTIES (property_name=property_value, ...)]
*
* partition_fields:
@@ -4657,8 +4676,8 @@ class AstBuilder extends DataTypeAstBuilder
*/
override def visitReplaceTable(ctx: ReplaceTableContext): LogicalPlan =
withOrigin(ctx) {
val orCreate = ctx.replaceTableHeader().CREATE() != null
- val (partTransforms, partCols, bucketSpec, properties, options, location,
comment, serdeInfo,
- clusterBySpec) = visitCreateTableClauses(ctx.createTableClauses())
+ val (partTransforms, partCols, bucketSpec, properties, options, location,
comment, collation,
+ serdeInfo, clusterBySpec) =
visitCreateTableClauses(ctx.createTableClauses())
val columns =
Option(ctx.colDefinitionList()).map(visitColDefinitionList).getOrElse(Nil)
val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
@@ -4672,7 +4691,7 @@ class AstBuilder extends DataTypeAstBuilder
clusterBySpec.map(_.asTransform)
val tableSpec = UnresolvedTableSpec(properties, provider, options,
location, comment,
- serdeInfo, external = false)
+ collation, serdeInfo, external = false)
Option(ctx.query).map(plan) match {
case Some(_) if columns.nonEmpty =>
@@ -5078,6 +5097,21 @@ class AstBuilder extends DataTypeAstBuilder
}
}
+ /**
+ * Parse a [[AlterTableCollation]] command.
+ *
+ * For example:
+ * {{{
+ * ALTER TABLE table1 DEFAULT COLLATION name
+ * }}}
+ */
+ override def visitAlterTableCollation(ctx: AlterTableCollationContext):
LogicalPlan =
+ withOrigin(ctx) {
+ val table = createUnresolvedTable(
+ ctx.identifierReference, "ALTER TABLE ... DEFAULT COLLATION")
+ AlterTableCollation(table, visitCollationSpec(ctx.collationSpec()))
+ }
+
/**
* Parse [[SetViewProperties]] or [[SetTableProperties]] commands.
*
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
index 2f5d4b9c86e2..dbd2c0ba8e42 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
@@ -261,3 +261,15 @@ case class AlterTableClusterBy(
protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(table = newChild)
}
+
+/**
+ * The logical plan of the ALTER TABLE ... DEFAULT COLLATION name command.
+ */
+case class AlterTableCollation(
+ table: LogicalPlan, collation: String) extends AlterTableCommand {
+ override def changes: Seq[TableChange] = {
+ Seq(TableChange.setProperty(TableCatalog.PROP_COLLATION, collation))
+ }
+
+ protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(table = newChild)
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index 857522728eaf..85b5e8379d3d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -1338,6 +1338,7 @@ case class CreateView(
child: LogicalPlan,
userSpecifiedColumns: Seq[(String, Option[String])],
comment: Option[String],
+ collation: Option[String],
properties: Map[String, String],
originalText: Option[String],
query: LogicalPlan,
@@ -1486,6 +1487,7 @@ trait TableSpecBase {
def provider: Option[String]
def location: Option[String]
def comment: Option[String]
+ def collation: Option[String]
def serde: Option[SerdeInfo]
def external: Boolean
}
@@ -1496,6 +1498,7 @@ case class UnresolvedTableSpec(
optionExpression: OptionList,
location: Option[String],
comment: Option[String],
+ collation: Option[String],
serde: Option[SerdeInfo],
external: Boolean) extends UnaryExpression with Unevaluable with
TableSpecBase {
@@ -1541,10 +1544,11 @@ case class TableSpec(
options: Map[String, String],
location: Option[String],
comment: Option[String],
+ collation: Option[String],
serde: Option[SerdeInfo],
external: Boolean) extends TableSpecBase {
def withNewLocation(newLocation: Option[String]): TableSpec = {
- TableSpec(properties, provider, options, newLocation, comment, serde,
external)
+ TableSpec(properties, provider, options, newLocation, comment, collation,
serde, external)
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index e1f114a6170a..97cc263c56c5 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -53,6 +53,7 @@ private[sql] object CatalogV2Util {
*/
val TABLE_RESERVED_PROPERTIES =
Seq(TableCatalog.PROP_COMMENT,
+ TableCatalog.PROP_COLLATION,
TableCatalog.PROP_LOCATION,
TableCatalog.PROP_PROVIDER,
TableCatalog.PROP_OWNER,
@@ -459,7 +460,7 @@ private[sql] object CatalogV2Util {
def convertTableProperties(t: TableSpec): Map[String, String] = {
val props = convertTableProperties(
t.properties, t.options, t.serde, t.location, t.comment,
- t.provider, t.external)
+ t.collation, t.provider, t.external)
withDefaultOwnership(props)
}
@@ -469,6 +470,7 @@ private[sql] object CatalogV2Util {
serdeInfo: Option[SerdeInfo],
location: Option[String],
comment: Option[String],
+ collation: Option[String],
provider: Option[String],
external: Boolean = false): Map[String, String] = {
properties ++
@@ -478,6 +480,7 @@ private[sql] object CatalogV2Util {
(if (external) Some(TableCatalog.PROP_EXTERNAL -> "true") else None) ++
provider.map(TableCatalog.PROP_PROVIDER -> _) ++
comment.map(TableCatalog.PROP_COMMENT -> _) ++
+ collation.map(TableCatalog.PROP_COLLATION -> _) ++
location.map(TableCatalog.PROP_LOCATION -> _)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala
index 4a5a607e8a8a..570ab1338dbf 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala
@@ -85,6 +85,7 @@ private[sql] object V1Table {
TableCatalog.OPTION_PREFIX + key -> value } ++
v1Table.provider.map(TableCatalog.PROP_PROVIDER -> _) ++
v1Table.comment.map(TableCatalog.PROP_COMMENT -> _) ++
+ v1Table.collation.map(TableCatalog.PROP_COLLATION -> _) ++
v1Table.storage.locationUri.map { loc =>
TableCatalog.PROP_LOCATION -> CatalogUtils.URIToString(loc)
} ++
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala
index 6b034d3dbee0..133670d5fcce 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.util.ArrayImplicits._
class CreateTablePartitioningValidationSuite extends AnalysisTest {
val tableSpec =
- UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), None, None,
None, false)
+ UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), None, None,
None, None, false)
test("CreateTableAsSelect: fail missing top-level column") {
val plan = CreateTableAsSelect(
UnresolvedIdentifier(Array("table_name").toImmutableArraySeq),
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index 5e871208698a..0ec2c80282fc 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -2655,7 +2655,7 @@ class DDLParserSuite extends AnalysisTest {
val createTableResult =
CreateTable(UnresolvedIdentifier(Seq("my_tab")), columnsWithDefaultValue,
Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String],
Some("parquet"),
- OptionList(Seq.empty), None, None, None, false), false)
+ OptionList(Seq.empty), None, None, None, None, false), false)
// Parse the CREATE TABLE statement twice, swapping the order of the NOT
NULL and DEFAULT
// options, to make sure that the parser accepts any ordering of these
options.
comparePlans(parsePlan(
@@ -2668,7 +2668,7 @@ class DDLParserSuite extends AnalysisTest {
"b STRING NOT NULL DEFAULT 'abc') USING parquet"),
ReplaceTable(UnresolvedIdentifier(Seq("my_tab")),
columnsWithDefaultValue,
Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String],
Some("parquet"),
- OptionList(Seq.empty), None, None, None, false), false))
+ OptionList(Seq.empty), None, None, None, None, false), false))
// These ALTER TABLE statements should parse successfully.
comparePlans(
parsePlan("ALTER TABLE t1 ADD COLUMN x int NOT NULL DEFAULT 42"),
@@ -2828,12 +2828,12 @@ class DDLParserSuite extends AnalysisTest {
"CREATE TABLE my_tab(a INT, b INT NOT NULL GENERATED ALWAYS AS (a+1))
USING parquet"),
CreateTable(UnresolvedIdentifier(Seq("my_tab")),
columnsWithGenerationExpr,
Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String],
Some("parquet"),
- OptionList(Seq.empty), None, None, None, false), false))
+ OptionList(Seq.empty), None, None, None, None, false), false))
comparePlans(parsePlan(
"REPLACE TABLE my_tab(a INT, b INT NOT NULL GENERATED ALWAYS AS (a+1))
USING parquet"),
ReplaceTable(UnresolvedIdentifier(Seq("my_tab")),
columnsWithGenerationExpr,
Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String],
Some("parquet"),
- OptionList(Seq.empty), None, None, None, false), false))
+ OptionList(Seq.empty), None, None, None, None, false), false))
// Two generation expressions
checkError(
exception = parseException("CREATE TABLE my_tab(a INT, " +
@@ -2903,6 +2903,7 @@ class DDLParserSuite extends AnalysisTest {
None,
None,
None,
+ None,
false
),
false
@@ -2925,6 +2926,7 @@ class DDLParserSuite extends AnalysisTest {
None,
None,
None,
+ None,
false
),
false
@@ -3198,4 +3200,49 @@ class DDLParserSuite extends AnalysisTest {
condition = "INTERNAL_ERROR",
parameters = Map("message" -> "INSERT OVERWRITE DIRECTORY is not
supported."))
}
+
+ test("create table with bad collation name") {
+ checkError(
+ exception = internalException("CREATE TABLE t DEFAULT COLLATION XD"),
+ condition = "COLLATION_INVALID_NAME",
+ parameters = Map("proposals" -> "id, xh, af", "collationName" -> "XD")
+ )
+ }
+
+ private val testSuppCollations =
+ Seq("UTF8_BINARY", "UTF8_LCASE", "UNICODE", "UNICODE_CI",
"UNICODE_CI_RTRIM", "sr", "sr_CI_AI")
+
+ test("create table with default collation") {
+ testSuppCollations.foreach { collation =>
+ comparePlans(parsePlan(
+ s"CREATE TABLE t (c STRING) USING parquet DEFAULT COLLATION
${collation.toLowerCase()}"),
+ CreateTable(UnresolvedIdentifier(Seq("t")),
+ Seq(ColumnDefinition("c", StringType)),
+ Seq.empty[Transform],
+ UnresolvedTableSpec(Map.empty[String, String], Some("parquet"),
OptionList(Seq.empty),
+ None, None, Some(collation), None, false), false))
+ }
+ }
+
+ test("replace table with default collation") {
+ testSuppCollations.foreach { collation =>
+ comparePlans(parsePlan(
+ s"REPLACE TABLE t (c STRING) USING parquet DEFAULT COLLATION
${collation.toLowerCase()}"),
+ ReplaceTable(UnresolvedIdentifier(Seq("t")),
+ Seq(ColumnDefinition("c", StringType)),
+ Seq.empty[Transform],
+ UnresolvedTableSpec(Map.empty[String, String], Some("parquet"),
OptionList(Seq.empty),
+ None, None, Some(collation), None, false), false))
+ }
+ }
+
+ test("alter table collation") {
+ testSuppCollations.foreach { collation =>
+ comparePlans(parsePlan(
+ s"ALTER TABLE t DEFAULT COLLATION ${collation.toLowerCase()}"),
+ AlterTableCollation(UnresolvedTable(Seq("t"),
+ "ALTER TABLE ... DEFAULT COLLATION"), collation)
+ )
+ }
+ }
}
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 82dfcf7a3694..bfb5f8f3fab7 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -2719,6 +2719,7 @@ class SparkConnectPlanner(
name = tableIdentifier,
userSpecifiedColumns = Nil,
comment = None,
+ collation = None,
properties = Map.empty,
originalText = None,
plan = transformRelation(createView.getInput),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index c8c1bacfb9de..b9ae0e5b9131 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1618,6 +1618,7 @@ class Dataset[T] private[sql](
name = TableIdentifier(identifier.last),
userSpecifiedColumns = Nil,
comment = None,
+ collation = None,
properties = Map.empty,
originalText = None,
plan = logicalPlan,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 5f1ab089cf3e..fa28a2cb9ead 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -421,11 +421,12 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
AlterViewSchemaBindingCommand(ident, viewSchemaMode)
case CreateView(ResolvedIdentifierInSessionCatalog(ident),
userSpecifiedColumns, comment,
- properties, originalText, child, allowExisting, replace,
viewSchemaMode) =>
+ collation, properties, originalText, child, allowExisting, replace,
viewSchemaMode) =>
CreateViewCommand(
name = ident,
userSpecifiedColumns = userSpecifiedColumns,
comment = comment,
+ collation = collation,
properties = properties,
originalText = originalText,
plan = child,
@@ -434,7 +435,7 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
viewType = PersistedView,
viewSchemaMode = viewSchemaMode)
- case CreateView(ResolvedIdentifier(catalog, _), _, _, _, _, _, _, _, _) =>
+ case CreateView(ResolvedIdentifier(catalog, _), _, _, _, _, _, _, _, _, _)
=>
throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "views")
case ShowViews(ns: ResolvedNamespace, pattern, output) =>
@@ -508,8 +509,8 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
storageFormat: CatalogStorageFormat,
provider: String): CreateTableV1 = {
val tableDesc = buildCatalogTable(
- ident, tableSchema, partitioning, tableSpec.properties, provider,
- tableSpec.location, tableSpec.comment, storageFormat, tableSpec.external)
+ ident, tableSchema, partitioning, tableSpec.properties, provider,
tableSpec.location,
+ tableSpec.comment, tableSpec.collation, storageFormat,
tableSpec.external)
val mode = if (ignoreIfExists) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTableV1(tableDesc, mode, query)
}
@@ -585,6 +586,7 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
provider: String,
location: Option[String],
comment: Option[String],
+ collation: Option[String],
storageFormat: CatalogStorageFormat,
external: Boolean): CatalogTable = {
val tableType = if (external || location.isDefined) {
@@ -605,7 +607,9 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
properties = properties ++
maybeClusterBySpec.map(
clusterBySpec => ClusterBySpec.toProperty(schema, clusterBySpec,
conf.resolver)),
- comment = comment)
+ comment = comment,
+ collation = collation
+ )
}
object ResolvedViewIdentifier {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index e15250eb46b5..8d5ddb2d85c4 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -377,7 +377,7 @@ class SparkSqlAstBuilder extends AstBuilder {
invalidStatement("CREATE TEMPORARY TABLE IF NOT EXISTS", ctx)
}
- val (_, _, _, _, options, location, _, _, _) =
+ val (_, _, _, _, options, location, _, _, _, _) =
visitCreateTableClauses(ctx.createTableClauses())
val provider =
Option(ctx.tableProvider).map(_.multipartIdentifier.getText).getOrElse(
throw QueryParsingErrors.createTempTableNotSpecifyProviderError(ctx))
@@ -520,6 +520,7 @@ class SparkSqlAstBuilder extends AstBuilder {
*
* create_view_clauses (order insensitive):
* [COMMENT view_comment]
+ * [DEFAULT COLLATION collation_name]
* [TBLPROPERTIES (property_name = property_value, ...)]
* }}}
*/
@@ -529,6 +530,7 @@ class SparkSqlAstBuilder extends AstBuilder {
}
checkDuplicateClauses(ctx.commentSpec(), "COMMENT", ctx)
+ checkDuplicateClauses(ctx.collationSpec(), "DEFAULT COLLATION", ctx)
checkDuplicateClauses(ctx.schemaBinding(), "WITH SCHEMA", ctx)
checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED ON", ctx)
checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
@@ -584,6 +586,7 @@ class SparkSqlAstBuilder extends AstBuilder {
withIdentClause(ctx.identifierReference(), UnresolvedIdentifier(_)),
userSpecifiedColumns,
visitCommentSpecList(ctx.commentSpec()),
+ visitCollationSpecList(ctx.collationSpec()),
properties,
Some(originalText),
qPlan,
@@ -609,6 +612,7 @@ class SparkSqlAstBuilder extends AstBuilder {
tableIdentifier,
userSpecifiedColumns,
visitCommentSpecList(ctx.commentSpec()),
+ visitCollationSpecList(ctx.collationSpec()),
properties,
Option(source(ctx.query)),
otherPlans.head,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index a98d9886a273..d5a72fd6c441 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -47,6 +47,7 @@ import org.apache.spark.util.ArrayImplicits._
* @param userSpecifiedColumns the output column names and optional comments
specified by users,
* can be Nil if not specified.
* @param comment the comment of this view.
+ * @param collation the collation of this view.
* @param properties the properties of this view.
* @param originalText the original SQL text of this view, can be None if this
view is created via
* Dataset API.
@@ -64,6 +65,7 @@ case class CreateViewCommand(
name: TableIdentifier,
userSpecifiedColumns: Seq[(String, Option[String])],
comment: Option[String],
+ collation: Option[String],
properties: Map[String, String],
originalText: Option[String],
plan: LogicalPlan,
@@ -220,7 +222,8 @@ case class CreateViewCommand(
properties = newProperties,
viewOriginalText = originalText,
viewText = originalText,
- comment = comment
+ comment = comment,
+ collation = collation
)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
index 56c44a125681..86fa0c8523f1 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
@@ -89,6 +89,7 @@ case class CacheTableAsSelectExec(
name = TableIdentifier(tempViewName),
userSpecifiedColumns = Nil,
comment = None,
+ collation = None,
properties = Map.empty,
originalText = Some(originalText),
plan = query,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala
index 37339a34af3d..4195560c5cc1 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala
@@ -57,6 +57,7 @@ case class ShowCreateTableExec(
showTableOptions(builder, tableOptions)
showTablePartitioning(table, builder)
showTableComment(table, builder)
+ showTableCollation(table, builder)
showTableLocation(table, builder)
showTableProperties(table, builder, tableOptions)
}
@@ -155,6 +156,12 @@ case class ShowCreateTableExec(
.foreach(builder.append)
}
+ private def showTableCollation(table: Table, builder: StringBuilder): Unit =
{
+ Option(table.properties.get(TableCatalog.PROP_COLLATION))
+ .map("COLLATION '" + escapeSingleQuotedString(_) + "'\n")
+ .foreach(builder.append)
+ }
+
private def concatByMultiLines(iter: Iterable[String]): String = {
iter.mkString("(\n ", ",\n ", ")\n")
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index 22c13fd98ced..e9927cdcc7a3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -239,7 +239,8 @@ class V2SessionCatalog(catalog: SessionCatalog)
maybeClusterBySpec.map(
clusterBySpec => ClusterBySpec.toProperty(newSchema, clusterBySpec,
conf.resolver)),
tracksPartitionsInCatalog = conf.manageFilesourcePartitions,
- comment = Option(properties.get(TableCatalog.PROP_COMMENT)))
+ comment = Option(properties.get(TableCatalog.PROP_COMMENT)),
+ collation = Option(properties.get(TableCatalog.PROP_COLLATION)))
try {
catalog.createTable(tableDesc, ignoreIfExists = false)
@@ -290,6 +291,7 @@ class V2SessionCatalog(catalog: SessionCatalog)
val schema = CatalogV2Util.applySchemaChanges(
catalogTable.schema, changes, catalogTable.provider, "ALTER TABLE")
val comment = properties.get(TableCatalog.PROP_COMMENT)
+ val collation = properties.get(TableCatalog.PROP_COLLATION)
val owner = properties.getOrElse(TableCatalog.PROP_OWNER,
catalogTable.owner)
val location =
properties.get(TableCatalog.PROP_LOCATION).map(CatalogUtils.stringToURI)
val storage = if (location.isDefined) {
@@ -303,7 +305,7 @@ class V2SessionCatalog(catalog: SessionCatalog)
catalog.alterTable(
catalogTable.copy(
properties = finalProperties, schema = schema, owner = owner,
comment = comment,
- storage = storage))
+ collation = collation, storage = storage))
} catch {
case _: NoSuchTableException =>
throw QueryCompilationErrors.noSuchTableError(ident)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 64689e75e2e5..5fd88b417ac4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -685,6 +685,7 @@ class CatalogImpl(sparkSession: SparkSession) extends
Catalog {
optionExpression = newOptions,
location = location,
comment = { if (description.isEmpty) None else Some(description) },
+ collation = None,
serde = None,
external = tableType == CatalogTableType.EXTERNAL)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
index 16f9fcf77d62..0069062e6307 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
@@ -209,6 +209,7 @@ final class DataFrameWriterImpl[T] private[sql](ds:
Dataset[T]) extends DataFram
optionExpression = OptionList(Seq.empty),
location = extraOptions.get("path"),
comment = extraOptions.get(TableCatalog.PROP_COMMENT),
+ collation = extraOptions.get(TableCatalog.PROP_COLLATION),
serde = None,
external = false)
runCommand(df.sparkSession) {
@@ -469,6 +470,7 @@ final class DataFrameWriterImpl[T] private[sql](ds:
Dataset[T]) extends DataFram
optionExpression = OptionList(Seq.empty),
location = extraOptions.get("path"),
comment = extraOptions.get(TableCatalog.PROP_COMMENT),
+ collation = extraOptions.get(TableCatalog.PROP_COLLATION),
serde = None,
external = false)
ReplaceTableAsSelect(
@@ -489,6 +491,7 @@ final class DataFrameWriterImpl[T] private[sql](ds:
Dataset[T]) extends DataFram
optionExpression = OptionList(Seq.empty),
location = extraOptions.get("path"),
comment = extraOptions.get(TableCatalog.PROP_COMMENT),
+ collation = extraOptions.get(TableCatalog.PROP_COLLATION),
serde = None,
external = false)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterV2Impl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterV2Impl.scala
index 0a19e6c47afa..86ea55bc59b7 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterV2Impl.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterV2Impl.scala
@@ -150,6 +150,7 @@ final class DataFrameWriterV2Impl[T] private[sql](table:
String, ds: Dataset[T])
optionExpression = OptionList(Seq.empty),
location = None,
comment = None,
+ collation = None,
serde = None,
external = false)
runCommand(
@@ -215,6 +216,7 @@ final class DataFrameWriterV2Impl[T] private[sql](table:
String, ds: Dataset[T])
optionExpression = OptionList(Seq.empty),
location = None,
comment = None,
+ collation = None,
serde = None,
external = false)
runCommand(ReplaceTableAsSelect(
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index b0233d2c51b7..d41933c6a135 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -175,6 +175,7 @@ final class DataStreamWriter[T] private[sql](ds:
Dataset[T]) extends api.DataStr
extraOptions.get("path"),
None,
None,
+ None,
external = false)
val cmd = CreateTable(
UnresolvedIdentifier(originalMultipartIdentifier),
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 6a659fa6e3ee..87d0a1ff4e7b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -1260,8 +1260,12 @@ class DataSourceV2SQLSuiteV1Filter
PROP_OWNER -> "it will be set to the current user",
PROP_EXTERNAL -> "please use CREATE EXTERNAL TABLE"
)
+ val excludedProperties = Set(TableCatalog.PROP_COMMENT,
TableCatalog.PROP_COLLATION)
+ val tableLegacyProperties = CatalogV2Util.TABLE_RESERVED_PROPERTIES
+ .filterNot(excludedProperties.contains)
+
withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "false")) {
- CatalogV2Util.TABLE_RESERVED_PROPERTIES.filterNot(_ ==
PROP_COMMENT).foreach { key =>
+ tableLegacyProperties.foreach { key =>
Seq("OPTIONS", "TBLPROPERTIES").foreach { clause =>
Seq("CREATE", "REPLACE").foreach { action =>
val sqlText = s"$action TABLE testcat.reservedTest (key int) " +
@@ -1314,7 +1318,7 @@ class DataSourceV2SQLSuiteV1Filter
}
}
withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "true")) {
- CatalogV2Util.TABLE_RESERVED_PROPERTIES.filterNot(_ ==
PROP_COMMENT).foreach { key =>
+ tableLegacyProperties.foreach { key =>
Seq("OPTIONS", "TBLPROPERTIES").foreach { clause =>
withTable("testcat.reservedTest") {
Seq("CREATE", "REPLACE").foreach { action =>
@@ -3389,6 +3393,7 @@ class DataSourceV2SQLSuiteV1Filter
|TBLPROPERTIES ('prop1' = '1', 'prop2' = '2')
|PARTITIONED BY (a)
|LOCATION '/tmp'
+ |DEFAULT COLLATION sr_CI_AI
""".stripMargin)
val table =
spark.sessionState.catalogManager.v2SessionCatalog.asTableCatalog
@@ -3396,6 +3401,7 @@ class DataSourceV2SQLSuiteV1Filter
val properties = table.properties
assert(properties.get(TableCatalog.PROP_PROVIDER) == "parquet")
assert(properties.get(TableCatalog.PROP_COMMENT) == "This is a comment")
+ assert(properties.get(TableCatalog.PROP_COLLATION) == "sr_CI_AI")
assert(properties.get(TableCatalog.PROP_LOCATION) == "file:/tmp")
assert(properties.containsKey(TableCatalog.PROP_OWNER))
assert(properties.get(TableCatalog.PROP_EXTERNAL) == "true")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
index 5091c72ef96a..67fca0980213 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
@@ -53,7 +53,8 @@ class V2CommandsCaseSensitivitySuite
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
Seq("ID", "iD").foreach { ref =>
val tableSpec =
- UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), None,
None, None, false)
+ UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty),
+ None, None, None, None, false)
val plan = CreateTableAsSelect(
UnresolvedIdentifier(Array("table_name").toImmutableArraySeq),
Expressions.identity(ref) :: Nil,
@@ -77,7 +78,8 @@ class V2CommandsCaseSensitivitySuite
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
Seq("POINT.X", "point.X", "poInt.x", "poInt.X").foreach { ref =>
val tableSpec =
- UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), None,
None, None, false)
+ UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty),
+ None, None, None, None, false)
val plan = CreateTableAsSelect(
UnresolvedIdentifier(Array("table_name").toImmutableArraySeq),
Expressions.bucket(4, ref) :: Nil,
@@ -102,7 +104,8 @@ class V2CommandsCaseSensitivitySuite
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
Seq("ID", "iD").foreach { ref =>
val tableSpec =
- UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), None,
None, None, false)
+ UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty),
+ None, None, None, None, false)
val plan = ReplaceTableAsSelect(
UnresolvedIdentifier(Array("table_name").toImmutableArraySeq),
Expressions.identity(ref) :: Nil,
@@ -126,7 +129,8 @@ class V2CommandsCaseSensitivitySuite
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
Seq("POINT.X", "point.X", "poInt.x", "poInt.X").foreach { ref =>
val tableSpec =
- UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), None,
None, None, false)
+ UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty),
+ None, None, None, None, false)
val plan = ReplaceTableAsSelect(
UnresolvedIdentifier(Array("table_name").toImmutableArraySeq),
Expressions.bucket(4, ref) :: Nil,
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableSetTblPropertiesSuiteBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableSetTblPropertiesSuiteBase.scala
index 52a90497fdd3..9ec63acb1d3a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableSetTblPropertiesSuiteBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableSetTblPropertiesSuiteBase.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command
import org.apache.spark.sql.{AnalysisException, QueryTest}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.parser.ParseException
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, TableCatalog}
+import org.apache.spark.sql.connector.catalog.TableCatalog
import org.apache.spark.sql.errors.DataTypeErrors.toSQLId
import org.apache.spark.sql.internal.SQLConf
@@ -89,7 +89,7 @@ trait AlterTableSetTblPropertiesSuiteBase extends QueryTest
with DDLCommandTestU
PROP_EXTERNAL -> "please use CREATE EXTERNAL TABLE"
)
withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "false")) {
- CatalogV2Util.TABLE_RESERVED_PROPERTIES.filterNot(_ ==
PROP_COMMENT).foreach { key =>
+ tableLegacyProperties.foreach { key =>
withNamespaceAndTable("ns", "tbl") { t =>
val sqlText = s"ALTER TABLE $t SET TBLPROPERTIES ('$key'='bar')"
checkError(
@@ -109,7 +109,7 @@ trait AlterTableSetTblPropertiesSuiteBase extends QueryTest
with DDLCommandTestU
}
}
withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "true")) {
- CatalogV2Util.TABLE_RESERVED_PROPERTIES.filterNot(_ ==
PROP_COMMENT).foreach { key =>
+ tableLegacyProperties.foreach { key =>
Seq("OPTIONS", "TBLPROPERTIES").foreach { clause =>
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (key int) USING parquet $clause
('$key'='bar')")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableUnsetTblPropertiesSuiteBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableUnsetTblPropertiesSuiteBase.scala
index 0013919fca08..0e9e9d9c6081 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableUnsetTblPropertiesSuiteBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableUnsetTblPropertiesSuiteBase.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command
import org.apache.spark.sql.{AnalysisException, QueryTest}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.parser.ParseException
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, TableCatalog}
+import org.apache.spark.sql.connector.catalog.TableCatalog
import org.apache.spark.sql.errors.DataTypeErrors.toSQLId
import org.apache.spark.sql.internal.SQLConf
@@ -109,7 +109,7 @@ trait AlterTableUnsetTblPropertiesSuiteBase extends
QueryTest with DDLCommandTes
PROP_EXTERNAL -> "please use CREATE EXTERNAL TABLE"
)
withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "false")) {
- CatalogV2Util.TABLE_RESERVED_PROPERTIES.filterNot(_ ==
PROP_COMMENT).foreach { key =>
+ tableLegacyProperties.foreach { key =>
withNamespaceAndTable("ns", "tbl") { t =>
val sqlText = s"ALTER TABLE $t UNSET TBLPROPERTIES ('$key')"
checkError(
@@ -129,7 +129,7 @@ trait AlterTableUnsetTblPropertiesSuiteBase extends
QueryTest with DDLCommandTes
}
}
withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "true")) {
- CatalogV2Util.TABLE_RESERVED_PROPERTIES.filterNot(_ ==
PROP_COMMENT).foreach { key =>
+ tableLegacyProperties.foreach { key =>
Seq("OPTIONS", "TBLPROPERTIES").foreach { clause =>
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (key int) USING parquet $clause
('$key'='bar')")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandTestUtils.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandTestUtils.scala
index 39f2abd35c2b..39624a33d861 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandTestUtils.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandTestUtils.scala
@@ -26,6 +26,7 @@ import org.scalatest.Tag
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, TableCatalog}
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.test.SQLTestUtils
@@ -172,6 +173,11 @@ trait DDLCommandTestUtils extends SQLTestUtils {
FileUtils.copyDirectory(new File(part0Loc), new File(part1Loc))
part1Loc
}
+
+ def tableLegacyProperties: Seq[String] = {
+ val excludedProperties = Set(TableCatalog.PROP_COMMENT,
TableCatalog.PROP_COLLATION)
+
CatalogV2Util.TABLE_RESERVED_PROPERTIES.filterNot(excludedProperties.contains)
+ }
}
object DDLCommandTestUtils {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
index 8b868c0e1723..d38708ab3745 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
@@ -498,6 +498,7 @@ class DDLParserSuite extends AnalysisTest with
SharedSparkSession {
UnresolvedIdentifier(Seq("view1")),
Seq.empty[(String, Option[String])],
None,
+ None,
Map.empty[String, String],
Some("SELECT * FROM tab1"),
parser.parsePlan("SELECT * FROM tab1"),
@@ -513,6 +514,7 @@ class DDLParserSuite extends AnalysisTest with
SharedSparkSession {
Seq("a").asTableIdentifier,
Seq.empty[(String, Option[String])],
None,
+ None,
Map.empty[String, String],
Some("SELECT * FROM tab1"),
parser.parsePlan("SELECT * FROM tab1"),
@@ -539,6 +541,7 @@ class DDLParserSuite extends AnalysisTest with
SharedSparkSession {
|(col1, col3 COMMENT 'hello')
|TBLPROPERTIES('prop1Key'="prop1Val")
|COMMENT 'BLABLA'
+ |DEFAULT COLLATION uNiCodE
|AS SELECT * FROM tab1
""".stripMargin
val parsed1 = parser.parsePlan(v1)
@@ -546,6 +549,7 @@ class DDLParserSuite extends AnalysisTest with
SharedSparkSession {
UnresolvedIdentifier(Seq("view1")),
Seq("col1" -> None, "col3" -> Some("hello")),
Some("BLABLA"),
+ Some("UNICODE"),
Map("prop1Key" -> "prop1Val"),
Some("SELECT * FROM tab1"),
parser.parsePlan("SELECT * FROM tab1"),
@@ -559,6 +563,7 @@ class DDLParserSuite extends AnalysisTest with
SharedSparkSession {
|CREATE OR REPLACE GLOBAL TEMPORARY VIEW a
|(col1, col3 COMMENT 'hello')
|COMMENT 'BLABLA'
+ |DEFAULT COLLATION uNiCoDe
|AS SELECT * FROM tab1
""".stripMargin
val parsed2 = parser.parsePlan(v2)
@@ -566,6 +571,7 @@ class DDLParserSuite extends AnalysisTest with
SharedSparkSession {
Seq("a").asTableIdentifier,
Seq("col1" -> None, "col3" -> Some("hello")),
Some("BLABLA"),
+ Some("UNICODE"),
Map(),
Some("SELECT * FROM tab1"),
parser.parsePlan("SELECT * FROM tab1"),
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala
index c4e9ff93ef85..f8d2e9dd3a3c 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala
@@ -293,4 +293,29 @@ trait DescribeTableSuiteBase extends QueryTest with
DDLCommandTestUtils {
Row("col1", "string", null)))
}
}
+
+ Seq(true, false).foreach { hasCollations =>
+ test(s"DESCRIBE TABLE EXTENDED with collation specified = $hasCollations")
{
+
+ withNamespaceAndTable("ns", "tbl") { tbl =>
+ val getCollationDescription = () => sql(s"DESCRIBE TABLE EXTENDED
$tbl")
+ .where("col_name = 'Collation'")
+
+ val defaultCollation = if (hasCollations) "DEFAULT COLLATION uNiCoDe"
else ""
+
+ sql(s"CREATE TABLE $tbl (id string) $defaultUsing $defaultCollation")
+ val descriptionDf = getCollationDescription()
+
+ if (hasCollations) {
+ checkAnswer(descriptionDf, Seq(Row("Collation", "UNICODE", "")))
+ } else {
+ assert(descriptionDf.isEmpty)
+ }
+
+ sql(s"ALTER TABLE $tbl DEFAULT COLLATION UniCode_cI_rTrIm")
+ val newDescription = getCollationDescription()
+ checkAnswer(newDescription, Seq(Row("Collation", "UNICODE_CI_RTRIM",
"")))
+ }
+ }
+ }
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala
index eaf016ac2fa9..164ac2bff9f6 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala
@@ -218,6 +218,7 @@ class DescribeTableSuite extends DescribeTableSuiteBase
with CommandSuiteBase {
" PARTITIONED BY (id)" +
" TBLPROPERTIES ('bar'='baz')" +
" COMMENT 'this is a test table'" +
+ " DEFAULT COLLATION unicode" +
" LOCATION 'file:/tmp/testcat/table_name'")
val descriptionDf = spark.sql(s"DESCRIBE TABLE EXTENDED $tbl")
assert(descriptionDf.schema.map(field => (field.name, field.dataType))
=== Seq(
@@ -241,6 +242,7 @@ class DescribeTableSuite extends DescribeTableSuiteBase
with CommandSuiteBase {
Row("Type", "EXTERNAL", ""),
Row("Provider", getProvider(), ""),
Row("Comment", "this is a test table", ""),
+ Row("Collation", "UNICODE", ""),
Row("Table Properties", "[bar=baz]", ""),
Row("Location", "file:/tmp/testcat/table_name", ""),
Row("Partition Provider", "Catalog", "")))
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index fd4d3220f367..acc588fb719c 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -518,6 +518,8 @@ private[hive] class HiveClientImpl(
val excludedTableProperties = HiveStatisticsProperties ++ Set(
// The property value of "comment" is moved to the dedicated field
"comment"
"comment",
+ // The property value of "collation" is moved to the dedicated field
"collation"
+ "collation",
// For EXTERNAL_TABLE, the table properties has a particular field
"EXTERNAL". This is added
// in the function toHiveTable.
"EXTERNAL"
@@ -527,6 +529,7 @@ private[hive] class HiveClientImpl(
case (key, _) => excludedTableProperties.contains(key)
}
val comment = properties.get("comment")
+ val collation = properties.get("collation")
CatalogTable(
identifier = TableIdentifier(h.getTableName, Option(h.getDbName)),
@@ -569,6 +572,7 @@ private[hive] class HiveClientImpl(
properties = filteredProperties,
stats = readHiveStats(properties),
comment = comment,
+ collation = collation,
// In older versions of Spark(before 2.2.0), we expand the view original
text and
// store that into `viewExpandedText`, that should be used in view
resolution.
// We get `viewExpandedText` as viewText, and also get
`viewOriginalText` in order to
@@ -1212,6 +1216,7 @@ private[hive] object HiveClientImpl extends Logging {
table.storage.properties.foreach { case (k, v) =>
hiveTable.setSerdeParam(k, v) }
table.properties.foreach { case (k, v) => hiveTable.setProperty(k, v) }
table.comment.foreach { c => hiveTable.setProperty("comment", c) }
+ table.collation.foreach { c => hiveTable.setProperty("collation", c) }
// Hive will expand the view text, so it needs 2 fields: viewOriginalText
and viewExpandedText.
// Since we don't expand the view text, but only add table properties, we
map the `viewText` to
// the both fields in hive table.
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
index 5c65eb8b12ba..27dc80fbfc17 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier,
TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException,
NoSuchDatabaseException, PartitionsAlreadyExistException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
Literal}
+import org.apache.spark.sql.connector.catalog.TableCatalog
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.test.TestHiveVersion
import org.apache.spark.sql.types.{IntegerType, StructType}
@@ -68,11 +69,13 @@ class HiveClientSuite(version: String) extends
HiveVersionSuite(version) {
}
def table(database: String, tableName: String,
+ collation: Option[String] = None,
tableType: CatalogTableType = CatalogTableType.MANAGED): CatalogTable = {
CatalogTable(
identifier = TableIdentifier(tableName, Some(database)),
tableType = tableType,
schema = new StructType().add("key", "int"),
+ collation = collation,
storage = CatalogStorageFormat(
locationUri = None,
inputFormat = Some(classOf[TextInputFormat].getName),
@@ -204,6 +207,22 @@ class HiveClientSuite(version: String) extends
HiveVersionSuite(version) {
ignoreIfExists = false)
}
+ test("create/alter table with collations") {
+ client.createTable(table("default", tableName = "collation_table",
+ collation = Some("UNICODE")), ignoreIfExists = false)
+
+ val readBack = client.getTable("default", "collation_table")
+ assert(!readBack.properties.contains(TableCatalog.PROP_COLLATION))
+ assert(readBack.collation === Some("UNICODE"))
+
+ client.alterTable("default", "collation_table",
+ readBack.copy(collation = Some("UNICODE_CI")))
+ val alteredTbl = client.getTable("default", "collation_table")
+ assert(alteredTbl.collation === Some("UNICODE_CI"))
+
+ client.dropTable("default", "collation_table", ignoreIfNotExists = true,
purge = true)
+ }
+
test("loadTable") {
client.loadTable(
emptyDir,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]