This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5ac88b12f86b [SPARK-44886][SQL] Introduce CLUSTER BY clause for
CREATE/REPLACE TABLE
5ac88b12f86b is described below
commit 5ac88b12f86b306e7612591154c26aebabb957a8
Author: Terry Kim <[email protected]>
AuthorDate: Thu Nov 9 19:30:12 2023 +0800
[SPARK-44886][SQL] Introduce CLUSTER BY clause for CREATE/REPLACE TABLE
### What changes were proposed in this pull request?
This proposes to introduce `CLUSTER BY` SQL clause to CREATE/REPLACE SQL
syntax:
```
CREATE TABLE tbl(a int, b string) CLUSTER BY (a, b)
```
This doesn't introduce a default implementation for clustering, but it's up
to the catalog/datasource implementation to utilize the clustering information
(e.g., Delta, Iceberg, etc.).
### Why are the changes needed?
To introduce the concept of clustering to datasources.
### Does this PR introduce _any_ user-facing change?
Yes, this introduces a new SQL keyword.
### How was this patch tested?
Added extensive unit tests.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #42577 from imback82/cluster_by.
Lead-authored-by: Terry Kim <[email protected]>
Co-authored-by: Terry Kim <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../src/main/resources/error/error-classes.json | 12 +++
docs/sql-error-conditions.md | 12 +++
.../spark/sql/catalyst/parser/SqlBaseParser.g4 | 5 +
.../spark/sql/errors/QueryParsingErrors.scala | 8 ++
.../spark/sql/catalyst/catalog/interface.scala | 63 +++++++++++-
.../spark/sql/catalyst/parser/AstBuilder.scala | 46 +++++++--
.../sql/connector/catalog/CatalogV2Implicits.scala | 26 ++++-
.../sql/connector/expressions/expressions.scala | 36 +++++++
.../spark/sql/catalyst/parser/DDLParserSuite.scala | 110 ++++++++++++++++++++-
.../sql/connector/catalog/InMemoryBaseTable.scala | 1 +
.../expressions/TransformExtractorSuite.scala | 43 +++++++-
.../catalyst/analysis/ResolveSessionCatalog.scala | 8 +-
.../spark/sql/execution/SparkSqlParser.scala | 3 +-
.../datasources/v2/V2SessionCatalog.scala | 8 +-
.../apache/spark/sql/internal/CatalogImpl.scala | 3 +-
.../command/CreateTableClusterBySuiteBase.scala | 83 ++++++++++++++++
.../command/v1/CreateTableClusterBySuite.scala | 51 ++++++++++
.../command/v2/CreateTableClusterBySuite.scala | 50 ++++++++++
.../command/CreateTableClusterBySuite.scala | 39 ++++++++
19 files changed, 583 insertions(+), 24 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-classes.json
b/common/utils/src/main/resources/error/error-classes.json
index c38171c3d9e6..26f6c0240afb 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -2963,6 +2963,18 @@
],
"sqlState" : "42601"
},
+ "SPECIFY_CLUSTER_BY_WITH_BUCKETING_IS_NOT_ALLOWED" : {
+ "message" : [
+ "Cannot specify both CLUSTER BY and CLUSTERED BY INTO BUCKETS."
+ ],
+ "sqlState" : "42908"
+ },
+ "SPECIFY_CLUSTER_BY_WITH_PARTITIONED_BY_IS_NOT_ALLOWED" : {
+ "message" : [
+ "Cannot specify both CLUSTER BY and PARTITIONED BY."
+ ],
+ "sqlState" : "42908"
+ },
"SPECIFY_PARTITION_IS_NOT_ALLOWED" : {
"message" : [
"A CREATE TABLE without explicit column list cannot specify PARTITIONED
BY.",
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 8a5faa15dc9c..2cb433b19fa5 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -1852,6 +1852,18 @@ A CREATE TABLE without explicit column list cannot
specify bucketing information
Please use the form with explicit column list and specify bucketing
information.
Alternatively, allow bucketing information to be inferred by omitting the
clause.
+### SPECIFY_CLUSTER_BY_WITH_BUCKETING_IS_NOT_ALLOWED
+
+[SQLSTATE:
42908](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Cannot specify both CLUSTER BY and CLUSTERED BY INTO BUCKETS.
+
+### SPECIFY_CLUSTER_BY_WITH_PARTITIONED_BY_IS_NOT_ALLOWED
+
+[SQLSTATE:
42908](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Cannot specify both CLUSTER BY and PARTITIONED BY.
+
### SPECIFY_PARTITION_IS_NOT_ALLOWED
[SQLSTATE:
42601](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
diff --git
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 84a31dafed98..bd449a4e194e 100644
---
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -298,6 +298,10 @@ replaceTableHeader
: (CREATE OR)? REPLACE TABLE identifierReference
;
+clusterBySpec
+ : CLUSTER BY LEFT_PAREN multipartIdentifierList RIGHT_PAREN
+ ;
+
bucketSpec
: CLUSTERED BY identifierList
(SORTED BY orderedIdentifierList)?
@@ -383,6 +387,7 @@ createTableClauses
:((OPTIONS options=expressionPropertyList) |
(PARTITIONED BY partitioning=partitionFieldList) |
skewSpec |
+ clusterBySpec |
bucketSpec |
rowFormat |
createFileFormat |
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
b/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
index 2067bf7d0955..841a678144f5 100644
---
a/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
@@ -683,4 +683,12 @@ private[sql] object QueryParsingErrors extends
DataTypeErrorsBase {
ctx
)
}
+
+ def clusterByWithPartitionedBy(ctx: ParserRuleContext): Throwable = {
+ new ParseException(errorClass =
"SPECIFY_CLUSTER_BY_WITH_PARTITIONED_BY_IS_NOT_ALLOWED", ctx)
+ }
+
+ def clusterByWithBucketing(ctx: ParserRuleContext): Throwable = {
+ new ParseException(errorClass =
"SPECIFY_CLUSTER_BY_WITH_BUCKETING_IS_NOT_ALLOWED", ctx)
+ }
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 634afb47ea5e..066dbd9fad15 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -24,6 +24,9 @@ import java.util.Date
import scala.collection.mutable
import scala.util.control.NonFatal
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{ClassTagExtensions,
DefaultScalaModule}
import org.apache.commons.lang3.StringUtils
import org.json4s.JsonAST.{JArray, JString}
import org.json4s.jackson.JsonMethods._
@@ -31,7 +34,7 @@ import org.json4s.jackson.JsonMethods._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CurrentUserContext, FunctionIdentifier,
InternalRow, SQLConfHelper, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation,
UnresolvedLeafNode}
+import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation,
Resolver, UnresolvedLeafNode}
import
org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap,
AttributeReference, Cast, ExprId, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
@@ -39,10 +42,11 @@ import
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUti
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.connector.expressions.{FieldReference,
NamedReference}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.sql.util.{CaseInsensitiveStringMap, SchemaUtils}
/**
@@ -170,6 +174,55 @@ case class CatalogTablePartition(
}
}
+/**
+ * A container for clustering information.
+ *
+ * @param columnNames the names of the columns used for clustering.
+ */
+case class ClusterBySpec(columnNames: Seq[NamedReference]) {
+ override def toString: String = toJson
+
+ def toJson: String =
ClusterBySpec.mapper.writeValueAsString(columnNames.map(_.fieldNames))
+}
+
+object ClusterBySpec {
+ private val mapper = {
+ val ret = new ObjectMapper() with ClassTagExtensions
+ ret.setSerializationInclusion(Include.NON_ABSENT)
+ ret.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+ ret.registerModule(DefaultScalaModule)
+ ret
+ }
+
+ def fromProperty(columns: String): ClusterBySpec = {
+
ClusterBySpec(mapper.readValue[Seq[Seq[String]]](columns).map(FieldReference(_)))
+ }
+
+ def toProperty(
+ schema: StructType,
+ clusterBySpec: ClusterBySpec,
+ resolver: Resolver): (String, String) = {
+ CatalogTable.PROP_CLUSTERING_COLUMNS ->
+ normalizeClusterBySpec(schema, clusterBySpec, resolver).toJson
+ }
+
+ private def normalizeClusterBySpec(
+ schema: StructType,
+ clusterBySpec: ClusterBySpec,
+ resolver: Resolver): ClusterBySpec = {
+ val normalizedColumns = clusterBySpec.columnNames.map { columnName =>
+ val position = SchemaUtils.findColumnPosition(
+ columnName.fieldNames(), schema, resolver)
+ FieldReference(SchemaUtils.getColumnName(position, schema))
+ }
+
+ SchemaUtils.checkColumnNameDuplication(
+ normalizedColumns.map(_.toString),
+ resolver)
+
+ ClusterBySpec(normalizedColumns)
+ }
+}
/**
* A container for bucketing information.
@@ -462,6 +515,10 @@ case class CatalogTable(
if (value.isEmpty) key else s"$key: $value"
}.mkString("", "\n", "")
}
+
+ lazy val clusterBySpec: Option[ClusterBySpec] = {
+ properties.get(PROP_CLUSTERING_COLUMNS).map(ClusterBySpec.fromProperty)
+ }
}
object CatalogTable {
@@ -499,6 +556,8 @@ object CatalogTable {
val VIEW_STORING_ANALYZED_PLAN = VIEW_PREFIX + "storingAnalyzedPlan"
+ val PROP_CLUSTERING_COLUMNS: String = "clusteringColumns"
+
def splitLargeTableProp(
key: String,
value: String,
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 6d70ad29f876..eb501f56d81c 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -33,7 +33,7 @@ import org.apache.spark.{SparkArithmeticException,
SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper,
TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec,
CatalogStorageFormat, ClusterBySpec}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AnyValue, First,
Last, PercentileCont, PercentileDisc}
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
@@ -3241,6 +3241,15 @@ class AstBuilder extends DataTypeAstBuilder with
SQLConfHelper with Logging {
})
}
+ /**
+ * Create a [[ClusterBySpec]].
+ */
+ override def visitClusterBySpec(ctx: ClusterBySpecContext): ClusterBySpec =
withOrigin(ctx) {
+ val columnNames = ctx.multipartIdentifierList.multipartIdentifier.asScala
+ .map(typedVisit[Seq[String]]).map(FieldReference(_)).toSeq
+ ClusterBySpec(columnNames)
+ }
+
/**
* Convert a property list into a key-value map.
* This should be called through [[visitPropertyKeyValues]] or
[[visitPropertyKeys]].
@@ -3341,6 +3350,7 @@ class AstBuilder extends DataTypeAstBuilder with
SQLConfHelper with Logging {
* - location
* - comment
* - serde
+ * - clusterBySpec
*
* Note: Partition transforms are based on existing table schema definition.
It can be simple
* column names, or functions like `year(date_col)`. Partition columns are
column names with data
@@ -3348,7 +3358,7 @@ class AstBuilder extends DataTypeAstBuilder with
SQLConfHelper with Logging {
*/
type TableClauses = (
Seq[Transform], Seq[StructField], Option[BucketSpec], Map[String,
String],
- OptionList, Option[String], Option[String], Option[SerdeInfo])
+ OptionList, Option[String], Option[String], Option[SerdeInfo],
Option[ClusterBySpec])
/**
* Validate a create table statement and return the [[TableIdentifier]].
@@ -3809,6 +3819,7 @@ class AstBuilder extends DataTypeAstBuilder with
SQLConfHelper with Logging {
checkDuplicateClauses(ctx.rowFormat, "ROW FORMAT", ctx)
checkDuplicateClauses(ctx.commentSpec(), "COMMENT", ctx)
checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
+ checkDuplicateClauses(ctx.clusterBySpec(), "CLUSTER BY", ctx)
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)
if (ctx.skewSpec.size > 0) {
@@ -3827,8 +3838,19 @@ class AstBuilder extends DataTypeAstBuilder with
SQLConfHelper with Logging {
val comment = visitCommentSpecList(ctx.commentSpec())
val serdeInfo =
getSerdeInfo(ctx.rowFormat.asScala.toSeq,
ctx.createFileFormat.asScala.toSeq, ctx)
+ val clusterBySpec =
ctx.clusterBySpec().asScala.headOption.map(visitClusterBySpec)
+
+ if (clusterBySpec.isDefined) {
+ if (partCols.nonEmpty || partTransforms.nonEmpty) {
+ throw QueryParsingErrors.clusterByWithPartitionedBy(ctx)
+ }
+ if (bucketSpec.isDefined) {
+ throw QueryParsingErrors.clusterByWithBucketing(ctx)
+ }
+ }
+
(partTransforms, partCols, bucketSpec, cleanedProperties, cleanedOptions,
newLocation, comment,
- serdeInfo)
+ serdeInfo, clusterBySpec)
}
protected def getSerdeInfo(
@@ -3881,6 +3903,7 @@ class AstBuilder extends DataTypeAstBuilder with
SQLConfHelper with Logging {
* [OPTIONS table_property_list]
* [ROW FORMAT row_format]
* [STORED AS file_format]
+ * [CLUSTER BY (col_name, col_name, ...)]
* [CLUSTERED BY (col_name, col_name, ...)
* [SORTED BY (col_name [ASC|DESC], ...)]
* INTO num_buckets BUCKETS
@@ -3902,7 +3925,7 @@ class AstBuilder extends DataTypeAstBuilder with
SQLConfHelper with Logging {
.map(visitCreateOrReplaceTableColTypeList).getOrElse(Nil)
val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
val (partTransforms, partCols, bucketSpec, properties, options, location,
- comment, serdeInfo) = visitCreateTableClauses(ctx.createTableClauses())
+ comment, serdeInfo, clusterBySpec) =
visitCreateTableClauses(ctx.createTableClauses())
if (provider.isDefined && serdeInfo.isDefined) {
operationNotAllowed(s"CREATE TABLE ... USING ...
${serdeInfo.get.describe}", ctx)
@@ -3915,7 +3938,10 @@ class AstBuilder extends DataTypeAstBuilder with
SQLConfHelper with Logging {
}
val partitioning =
- partitionExpressions(partTransforms, partCols, ctx) ++
bucketSpec.map(_.asTransform)
+ partitionExpressions(partTransforms, partCols, ctx) ++
+ bucketSpec.map(_.asTransform) ++
+ clusterBySpec.map(_.asTransform)
+
val tableSpec = UnresolvedTableSpec(properties, provider, options,
location, comment,
serdeInfo, external)
@@ -3958,6 +3984,7 @@ class AstBuilder extends DataTypeAstBuilder with
SQLConfHelper with Logging {
* replace_table_clauses (order insensitive):
* [OPTIONS table_property_list]
* [PARTITIONED BY (partition_fields)]
+ * [CLUSTER BY (col_name, col_name, ...)]
* [CLUSTERED BY (col_name, col_name, ...)
* [SORTED BY (col_name [ASC|DESC], ...)]
* INTO num_buckets BUCKETS
@@ -3973,8 +4000,8 @@ class AstBuilder extends DataTypeAstBuilder with
SQLConfHelper with Logging {
*/
override def visitReplaceTable(ctx: ReplaceTableContext): LogicalPlan =
withOrigin(ctx) {
val orCreate = ctx.replaceTableHeader().CREATE() != null
- val (partTransforms, partCols, bucketSpec, properties, options, location,
comment, serdeInfo) =
- visitCreateTableClauses(ctx.createTableClauses())
+ val (partTransforms, partCols, bucketSpec, properties, options, location,
comment, serdeInfo,
+ clusterBySpec) = visitCreateTableClauses(ctx.createTableClauses())
val columns = Option(ctx.createOrReplaceTableColTypeList())
.map(visitCreateOrReplaceTableColTypeList).getOrElse(Nil)
val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
@@ -3984,7 +4011,10 @@ class AstBuilder extends DataTypeAstBuilder with
SQLConfHelper with Logging {
}
val partitioning =
- partitionExpressions(partTransforms, partCols, ctx) ++
bucketSpec.map(_.asTransform)
+ partitionExpressions(partTransforms, partCols, ctx) ++
+ bucketSpec.map(_.asTransform) ++
+ clusterBySpec.map(_.asTransform)
+
val tableSpec = UnresolvedTableSpec(properties, provider, options,
location, comment,
serdeInfo, external = false)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
index 8843a9fa237f..0c49f9e46730 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
@@ -19,13 +19,14 @@ package org.apache.spark.sql.connector.catalog
import scala.collection.mutable
+import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, ClusterBySpec}
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, QuotingUtils}
-import org.apache.spark.sql.connector.expressions.{BucketTransform,
FieldReference, IdentityTransform, LogicalExpressions, Transform}
+import org.apache.spark.sql.connector.expressions.{BucketTransform,
ClusterByTransform, FieldReference, IdentityTransform, LogicalExpressions,
Transform}
import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
import org.apache.spark.sql.types.StructType
@@ -53,10 +54,15 @@ private[sql] object CatalogV2Implicits {
}
}
+ implicit class ClusterByHelper(spec: ClusterBySpec) {
+ def asTransform: Transform = clusterBy(spec.columnNames.toArray)
+ }
+
implicit class TransformHelper(transforms: Seq[Transform]) {
- def convertTransforms: (Seq[String], Option[BucketSpec]) = {
+ def convertTransforms: (Seq[String], Option[BucketSpec],
Option[ClusterBySpec]) = {
val identityCols = new mutable.ArrayBuffer[String]
var bucketSpec = Option.empty[BucketSpec]
+ var clusterBySpec = Option.empty[ClusterBySpec]
transforms.map {
case IdentityTransform(FieldReference(Seq(col))) =>
@@ -73,11 +79,23 @@ private[sql] object CatalogV2Implicits {
sortCol.map(_.fieldNames.mkString("."))))
}
+ case ClusterByTransform(columnNames) =>
+ if (clusterBySpec.nonEmpty) {
+ // AstBuilder guarantees that it only passes down one
ClusterByTransform.
+ throw SparkException.internalError("Cannot have multiple cluster
by transforms.")
+ }
+ clusterBySpec = Some(ClusterBySpec(columnNames))
+
case transform =>
throw
QueryExecutionErrors.unsupportedPartitionTransformError(transform)
}
- (identityCols.toSeq, bucketSpec)
+ // Parser guarantees that partition and clustering cannot co-exist.
+ assert(!(identityCols.toSeq.nonEmpty && clusterBySpec.nonEmpty))
+ // Parser guarantees that bucketing and clustering cannot co-exist.
+ assert(!(bucketSpec.nonEmpty && clusterBySpec.nonEmpty))
+
+ (identityCols.toSeq, bucketSpec, clusterBySpec)
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/expressions/expressions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/expressions/expressions.scala
index fbd2520e2a77..0037f52a21b7 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/expressions/expressions.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/expressions/expressions.scala
@@ -52,6 +52,9 @@ private[sql] object LogicalExpressions {
sortedCols: Array[NamedReference]): SortedBucketTransform =
SortedBucketTransform(literal(numBuckets, IntegerType), references,
sortedCols)
+ def clusterBy(references: Array[NamedReference]): ClusterByTransform =
+ ClusterByTransform(references)
+
def identity(reference: NamedReference): IdentityTransform =
IdentityTransform(reference)
def years(reference: NamedReference): YearsTransform =
YearsTransform(reference)
@@ -150,6 +153,39 @@ private[sql] object BucketTransform {
}
}
+/**
+ * This class represents a transform for [[ClusterBySpec]]. This is used to
bundle
+ * ClusterBySpec in CreateTable's partitioning transforms to pass it down to
analyzer.
+ */
+final case class ClusterByTransform(
+ columnNames: Seq[NamedReference]) extends RewritableTransform {
+
+ override val name: String = "cluster_by"
+
+ override def references: Array[NamedReference] = columnNames.toArray
+
+ override def arguments: Array[Expression] = columnNames.toArray
+
+ override def toString: String =
s"$name(${arguments.map(_.describe).mkString(", ")})"
+
+ override def withReferences(newReferences: Seq[NamedReference]): Transform =
{
+ this.copy(columnNames = newReferences)
+ }
+}
+
+/**
+ * Convenience extractor for ClusterByTransform.
+ */
+object ClusterByTransform {
+ def unapply(transform: Transform): Option[Seq[NamedReference]] =
+ transform match {
+ case NamedTransform("cluster_by", arguments) =>
+ Some(arguments.map(_.asInstanceOf[NamedReference]))
+ case _ =>
+ None
+ }
+}
+
private[sql] final case class SortedBucketTransform(
numBuckets: Literal[Int],
columns: Seq[NamedReference],
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index b1f734050449..2e896d563a73 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{EqualTo,
Hex, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{GeneratedColumn,
ResolveDefaultColumns}
import
org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition.{after, first}
-import org.apache.spark.sql.connector.expressions.{ApplyTransform,
BucketTransform, DaysTransform, FieldReference, HoursTransform,
IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
+import org.apache.spark.sql.connector.expressions.{ApplyTransform,
BucketTransform, ClusterByTransform, DaysTransform, FieldReference,
HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform,
YearsTransform}
import org.apache.spark.sql.connector.expressions.LogicalExpressions.bucket
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{Decimal, IntegerType, LongType,
MetadataBuilder, StringType, StructType, TimestampType}
@@ -190,6 +190,50 @@ class DDLParserSuite extends AnalysisTest {
}
}
+ test("create/replace table - with cluster by") {
+ // Testing cluster by single part and multipart name.
+ Seq(
+ ("a INT, b STRING, ts TIMESTAMP",
+ "a, b",
+ new StructType()
+ .add("a", IntegerType)
+ .add("b", StringType)
+ .add("ts", TimestampType),
+ ClusterByTransform(Seq(FieldReference("a"), FieldReference("b")))),
+ ("a STRUCT<b INT, c STRING>, ts TIMESTAMP",
+ "a.b, ts",
+ new StructType()
+ .add("a",
+ new StructType()
+ .add("b", IntegerType)
+ .add("c", StringType))
+ .add("ts", TimestampType),
+ ClusterByTransform(Seq(FieldReference(Seq("a", "b")),
FieldReference("ts"))))
+ ).foreach { case (columns, clusteringColumns, schema, clusterByTransform)
=>
+ val createSql =
+ s"""CREATE TABLE my_tab ($columns) USING parquet
+ |CLUSTER BY ($clusteringColumns)
+ |""".stripMargin
+ val replaceSql =
+ s"""REPLACE TABLE my_tab ($columns) USING parquet
+ |CLUSTER BY ($clusteringColumns)
+ |""".stripMargin
+ val expectedTableSpec = TableSpec(
+ Seq("my_tab"),
+ Some(schema),
+ Seq(clusterByTransform),
+ Map.empty[String, String],
+ Some("parquet"),
+ OptionList(Seq.empty),
+ None,
+ None,
+ None)
+ Seq(createSql, replaceSql).foreach { sql =>
+ testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists =
false)
+ }
+ }
+ }
+
test("create/replace table - with comment") {
val createSql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet
COMMENT 'abc'"
val replaceSql = "REPLACE TABLE my_tab(a INT, b STRING) USING parquet
COMMENT 'abc'"
@@ -859,6 +903,26 @@ class DDLParserSuite extends AnalysisTest {
fragment = sql18,
start = 0,
stop = 86))
+
+ val sql19 = createTableHeader("CLUSTER BY (a)")
+ checkError(
+ exception = parseException(sql19),
+ errorClass = "DUPLICATE_CLAUSES",
+ parameters = Map("clauseName" -> "CLUSTER BY"),
+ context = ExpectedContext(
+ fragment = sql19,
+ start = 0,
+ stop = 65))
+
+ val sql20 = replaceTableHeader("CLUSTER BY (a)")
+ checkError(
+ exception = parseException(sql20),
+ errorClass = "DUPLICATE_CLAUSES",
+ parameters = Map("clauseName" -> "CLUSTER BY"),
+ context = ExpectedContext(
+ fragment = sql20,
+ start = 0,
+ stop = 66))
}
test("support for other types in OPTIONS") {
@@ -2896,6 +2960,50 @@ class DDLParserSuite extends AnalysisTest {
)
}
+ test("create table cluster by with bucket") {
+ val sql1 = "CREATE TABLE my_tab(a INT, b STRING) " +
+ "USING parquet CLUSTERED BY (a) INTO 2 BUCKETS CLUSTER BY (a)"
+ checkError(
+ exception = parseException(sql1),
+ errorClass = "SPECIFY_CLUSTER_BY_WITH_BUCKETING_IS_NOT_ALLOWED",
+ parameters = Map.empty,
+ context = ExpectedContext(fragment = sql1, start = 0, stop = 96)
+ )
+ }
+
+ test("replace table cluster by with bucket") {
+ val sql1 = "REPLACE TABLE my_tab(a INT, b STRING) " +
+ "USING parquet CLUSTERED BY (a) INTO 2 BUCKETS CLUSTER BY (a)"
+ checkError(
+ exception = parseException(sql1),
+ errorClass = "SPECIFY_CLUSTER_BY_WITH_BUCKETING_IS_NOT_ALLOWED",
+ parameters = Map.empty,
+ context = ExpectedContext(fragment = sql1, start = 0, stop = 97)
+ )
+ }
+
+ test("create table cluster by with partitioned by") {
+ val sql1 = "CREATE TABLE my_tab(a INT, b STRING) " +
+ "USING parquet CLUSTER BY (a) PARTITIONED BY (a)"
+ checkError(
+ exception = parseException(sql1),
+ errorClass = "SPECIFY_CLUSTER_BY_WITH_PARTITIONED_BY_IS_NOT_ALLOWED",
+ parameters = Map.empty,
+ context = ExpectedContext(fragment = sql1, start = 0, stop = 83)
+ )
+ }
+
+ test("replace table cluster by with partitioned by") {
+ val sql1 = "REPLACE TABLE my_tab(a INT, b STRING) " +
+ "USING parquet CLUSTER BY (a) PARTITIONED BY (a)"
+ checkError(
+ exception = parseException(sql1),
+ errorClass = "SPECIFY_CLUSTER_BY_WITH_PARTITIONED_BY_IS_NOT_ALLOWED",
+ parameters = Map.empty,
+ context = ExpectedContext(fragment = sql1, start = 0, stop = 84)
+ )
+ }
+
test("AstBuilder don't support `INSERT OVERWRITE DIRECTORY`") {
val insertDirSql =
s"""
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
index cd7f7295d5cb..318cbf6962c1 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
@@ -90,6 +90,7 @@ abstract class InMemoryBaseTable(
case _: HoursTransform =>
case _: BucketTransform =>
case _: SortedBucketTransform =>
+ case _: ClusterByTransform =>
case NamedTransform("truncate", Seq(_: NamedReference, _: Literal[_])) =>
case t if !allowUnsupportedTransforms =>
throw new IllegalArgumentException(s"Transform $t is not a supported
transform")
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/expressions/TransformExtractorSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/expressions/TransformExtractorSuite.scala
index 62cae3c86107..8ac268df80bc 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/expressions/TransformExtractorSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/expressions/TransformExtractorSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.connector.expressions
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst
-import org.apache.spark.sql.connector.expressions.LogicalExpressions.bucket
+import org.apache.spark.sql.connector.expressions.LogicalExpressions.{bucket,
clusterBy}
import org.apache.spark.sql.types.DataType
class TransformExtractorSuite extends SparkFunSuite {
@@ -210,4 +210,45 @@ class TransformExtractorSuite extends SparkFunSuite {
val copied2 = sortedBucketTransform.withReferences(reference2)
assert(copied2.equals(sortedBucketTransform))
}
+
+ test("ClusterBySpec extractor") {
+ val col = ref("a", "b")
+ val clusterByTransform = new Transform {
+ override def name: String = "cluster_by"
+ override def references: Array[NamedReference] = Array(col)
+ override def arguments: Array[Expression] = Array(col)
+ override def toString: String = s"$name(${col.describe})"
+ }
+
+ clusterByTransform match {
+ case ClusterByTransform(columnNames) =>
+ assert(columnNames.size === 1)
+ assert(columnNames(0).fieldNames === Seq("a", "b"))
+ case _ =>
+ fail("Did not match ClusterByTransform extractor")
+ }
+
+ transform("unknown", ref("a", "b")) match {
+ case ClusterByTransform(_) =>
+ fail("Matched unknown transform")
+ case _ =>
+ // expected
+ }
+ }
+
+ test("test cluster by") {
+ val col = Array(ref("a a", "b"), ref("ts"))
+
+ val clusterByTransform = clusterBy(col)
+ val reference = clusterByTransform.references
+ assert(reference.length == 2)
+ assert(reference(0).fieldNames() === Seq("a a", "b"))
+ assert(reference(1).fieldNames() === Seq("ts"))
+ val arguments = clusterByTransform.arguments
+ assert(arguments.length == 2)
+ assert(arguments(0).asInstanceOf[NamedReference].fieldNames() === Seq("a
a", "b"))
+ assert(arguments(1).asInstanceOf[NamedReference].fieldNames() ===
Seq("ts"))
+ val copied = clusterByTransform.withReferences(reference)
+ assert(copied.equals(clusterByTransform))
+ }
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 174881a46592..c557ec4a486d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -21,7 +21,7 @@ import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat,
CatalogTable, CatalogTableType, CatalogUtils}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat,
CatalogTable, CatalogTableType, CatalogUtils, ClusterBySpec}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
@@ -532,7 +532,7 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
} else {
CatalogTableType.MANAGED
}
- val (partitionColumns, maybeBucketSpec) = partitioning.convertTransforms
+ val (partitionColumns, maybeBucketSpec, maybeClusterBySpec) =
partitioning.convertTransforms
CatalogTable(
identifier = table,
@@ -542,7 +542,9 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
provider = Some(provider),
partitionColumnNames = partitionColumns,
bucketSpec = maybeBucketSpec,
- properties = properties,
+ properties = properties ++
+ maybeClusterBySpec.map(
+ clusterBySpec => ClusterBySpec.toProperty(schema, clusterBySpec,
conf.resolver)),
comment = comment)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 1cc214772535..d8e5d4f22701 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -323,7 +323,8 @@ class SparkSqlAstBuilder extends AstBuilder {
operationNotAllowed("CREATE TEMPORARY TABLE IF NOT EXISTS", ctx)
}
- val (_, _, _, _, options, location, _, _) =
visitCreateTableClauses(ctx.createTableClauses())
+ val (_, _, _, _, options, location, _, _, _) =
+ visitCreateTableClauses(ctx.createTableClauses())
val provider =
Option(ctx.tableProvider).map(_.multipartIdentifier.getText).getOrElse(
throw QueryParsingErrors.createTempTableNotSpecifyProviderError(ctx))
val schema =
Option(ctx.createOrReplaceTableColTypeList()).map(createSchema)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index bd2e65974931..6dd76973baa5 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -25,7 +25,7 @@ import scala.jdk.CollectionConverters._
import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper,
TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException,
NoSuchTableException, TableAlreadyExistsException}
-import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable,
CatalogTableType, CatalogUtils, SessionCatalog}
+import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable,
CatalogTableType, CatalogUtils, ClusterBySpec, SessionCatalog}
import org.apache.spark.sql.catalyst.util.TypeUtils._
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util,
Column, FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces,
Table, TableCatalog, TableCatalogCapability, TableChange, V1Table}
import org.apache.spark.sql.connector.catalog.NamespaceChange.RemoveProperty
@@ -114,7 +114,7 @@ class V2SessionCatalog(catalog: SessionCatalog)
partitions: Array[Transform],
properties: util.Map[String, String]): Table = {
import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.TransformHelper
- val (partitionColumns, maybeBucketSpec) =
partitions.toSeq.convertTransforms
+ val (partitionColumns, maybeBucketSpec, maybeClusterBySpec) =
partitions.toSeq.convertTransforms
val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER,
conf.defaultDataSourceName)
val tableProperties = properties.asScala
val location = Option(properties.get(TableCatalog.PROP_LOCATION))
@@ -135,7 +135,9 @@ class V2SessionCatalog(catalog: SessionCatalog)
provider = Some(provider),
partitionColumnNames = partitionColumns,
bucketSpec = maybeBucketSpec,
- properties = tableProperties.toMap,
+ properties = tableProperties.toMap ++
+ maybeClusterBySpec.map(
+ clusterBySpec => ClusterBySpec.toProperty(schema, clusterBySpec,
conf.resolver)),
tracksPartitionsInCatalog = conf.manageFilesourcePartitions,
comment = Option(properties.get(TableCatalog.PROP_COMMENT)))
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index b1ad454fc041..d58cd001e941 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -379,7 +379,8 @@ class CatalogImpl(sparkSession: SparkSession) extends
Catalog {
val columns = sparkSession.sessionState.executePlan(plan).analyzed match {
case ResolvedTable(_, _, table, _) =>
- val (partitionColumnNames, bucketSpecOpt) =
table.partitioning.toSeq.convertTransforms
+ // TODO (SPARK-45787): Support clusterBySpec for listColumns().
+ val (partitionColumnNames, bucketSpecOpt, _) =
table.partitioning.toSeq.convertTransforms
val bucketColumnNames =
bucketSpecOpt.map(_.bucketColumnNames).getOrElse(Nil)
schemaToColumns(table.schema(), partitionColumnNames.contains,
bucketColumnNames.contains)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CreateTableClusterBySuiteBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CreateTableClusterBySuiteBase.scala
new file mode 100644
index 000000000000..cb56d11b665d
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CreateTableClusterBySuiteBase.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{AnalysisException, QueryTest}
+
+/**
+ * This base suite contains unified tests for the `CREATE/REPLACE TABLE ...
CLUSTER BY` command
+ * that check V1 and V2 table catalogs. The tests that cannot run for all
supported catalogs are
+ * located in more specific test suites:
+ *
+ * - V2 table catalog tests:
`org.apache.spark.sql.execution.command.v2.CreateTableClusterBySuite`
+ * - V1 table catalog tests:
+ *
`org.apache.spark.sql.execution.command.v1.CreateTableClusterBySuiteBase`
+ * - V1 In-Memory catalog:
`org.apache.spark.sql.execution.command.v1.CreateTableClusterBySuite`
+ * - V1 Hive External catalog:
+ *
`org.apache.spark.sql.hive.execution.command.CreateTableClusterBySuite`
+ */
+trait CreateTableClusterBySuiteBase extends QueryTest with DDLCommandTestUtils
{
+ override val command = "CREATE/REPLACE TABLE CLUSTER BY"
+
+ protected val nestedColumnSchema: String =
+ "col1 INT, col2 STRUCT<col3 INT, `col4 1` INT>, col3 STRUCT<`col4.1` INT>"
+ protected val nestedClusteringColumns: Seq[String] =
+ Seq("col2.col3", "col2.`col4 1`", "col3.`col4.1`")
+
+ def validateClusterBy(tableName: String, clusteringColumns: Seq[String]):
Unit
+
+ test("test basic CREATE TABLE with clustering columns") {
+ withNamespaceAndTable("ns", "table") { tbl =>
+ spark.sql(s"CREATE TABLE $tbl (id INT, data STRING) $defaultUsing
CLUSTER BY (id, data)")
+ validateClusterBy(tbl, Seq("id", "data"))
+ }
+ }
+
+ test("test clustering columns with comma") {
+ withNamespaceAndTable("ns", "table") { tbl =>
+ spark.sql(s"CREATE TABLE $tbl (`i,d` INT, data STRING) $defaultUsing " +
+ "CLUSTER BY (`i,d`, data)")
+ validateClusterBy(tbl, Seq("`i,d`", "data"))
+ }
+ }
+
+ test("test nested clustering columns") {
+ withNamespaceAndTable("ns", "table") { tbl =>
+ spark.sql(s"CREATE TABLE $tbl " +
+ s"($nestedColumnSchema) " +
+ s"$defaultUsing CLUSTER BY (${nestedClusteringColumns.mkString(",")})")
+ validateClusterBy(tbl, nestedClusteringColumns)
+ }
+ }
+
+ test("clustering columns not defined in schema") {
+ withNamespaceAndTable("ns", "table") { tbl =>
+ val err = intercept[AnalysisException] {
+ sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing CLUSTER
BY (unknown)")
+ }
+ assert(err.message.contains("Couldn't find column unknown in:"))
+ }
+ }
+
+ // Converts three-part table name (catalog.namespace.table) to
TableIdentifier.
+ protected def parseTableName(threePartTableName: String): (String, String,
String) = {
+ val tablePath = threePartTableName.split('.')
+ assert(tablePath.length === 3)
+ (tablePath(0), tablePath(1), tablePath(2))
+ }
+}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/CreateTableClusterBySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/CreateTableClusterBySuite.scala
new file mode 100644
index 000000000000..2444fe062e28
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/CreateTableClusterBySuite.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.v1
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.ClusterBySpec
+import org.apache.spark.sql.connector.expressions.FieldReference
+import org.apache.spark.sql.execution.command
+
+/**
+ * This base suite contains unified tests for the `CREATE TABLE ... CLUSTER
BY` command that
+ * checks V1 table catalogs. The tests that cannot run for all V1 catalogs are
located in more
+ * specific test suites:
+ *
+ * - V1 In-Memory catalog:
`org.apache.spark.sql.execution.command.v1.CreateTableClusterBySuite`
+ * - V1 Hive External catalog:
+ * `org.apache.spark.sql.hive.execution.command.CreateTableClusterBySuite`
+ */
+trait CreateTableClusterBySuiteBase extends
command.CreateTableClusterBySuiteBase
+ with command.TestsV1AndV2Commands {
+ override def validateClusterBy(tableName: String, clusteringColumns:
Seq[String]): Unit = {
+ val catalog = spark.sessionState.catalog
+ val (_, db, t) = parseTableName(tableName)
+ val table = catalog.getTableMetadata(TableIdentifier.apply(t, Some(db)))
+ assert(table.clusterBySpec ===
Some(ClusterBySpec(clusteringColumns.map(FieldReference(_)))))
+ }
+}
+
+/**
+ * The class contains tests for the `CREATE TABLE ... CLUSTER BY` command to
check V1 In-Memory
+ * table catalog.
+ */
+class CreateTableClusterBySuite extends CreateTableClusterBySuiteBase
+ with CommandSuiteBase {
+ override def commandVersion: String =
super[CreateTableClusterBySuiteBase].commandVersion
+}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CreateTableClusterBySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CreateTableClusterBySuite.scala
new file mode 100644
index 000000000000..86b14d668038
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CreateTableClusterBySuite.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.v2
+
+import org.apache.spark.sql.connector.catalog.{Identifier,
InMemoryPartitionTable}
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
+import org.apache.spark.sql.connector.expressions.{ClusterByTransform,
FieldReference}
+import org.apache.spark.sql.execution.command
+
+/**
+ * The class contains tests for the `CREATE TABLE ... CLUSTER BY` command to
check V2 table
+ * catalogs.
+ */
+class CreateTableClusterBySuite extends command.CreateTableClusterBySuiteBase
+ with CommandSuiteBase {
+ override def validateClusterBy(tableName: String, clusteringColumns:
Seq[String]): Unit = {
+ val (catalog, namespace, table) = parseTableName(tableName)
+ val catalogPlugin = spark.sessionState.catalogManager.catalog(catalog)
+ val partTable = catalogPlugin.asTableCatalog
+ .loadTable(Identifier.of(Array(namespace), table))
+ .asInstanceOf[InMemoryPartitionTable]
+ assert(partTable.partitioning ===
+ Array(ClusterByTransform(clusteringColumns.map(FieldReference(_)))))
+ }
+
+ test("test REPLACE TABLE with clustering columns") {
+ withNamespaceAndTable("ns", "table") { tbl =>
+ spark.sql(s"CREATE TABLE $tbl (id INT) $defaultUsing CLUSTER BY (id)")
+ validateClusterBy(tbl, Seq("id"))
+
+ spark.sql(s"REPLACE TABLE $tbl (id2 INT) $defaultUsing CLUSTER BY (id2)")
+ validateClusterBy(tbl, Seq("id2"))
+ }
+ }
+}
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/CreateTableClusterBySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/CreateTableClusterBySuite.scala
new file mode 100644
index 000000000000..496cc13c4971
--- /dev/null
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/CreateTableClusterBySuite.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution.command
+
+import org.apache.spark.sql.execution.command.v1
+
+/**
+ * The class contains tests for the `CREATE TABLE ... CLUSTER BY` command to
check V1 Hive external
+ * table catalog.
+ */
+class CreateTableClusterBySuite extends v1.CreateTableClusterBySuiteBase with
CommandSuiteBase {
+ // Hive doesn't support nested column names with space and dot.
+ override protected val nestedColumnSchema: String =
+ "col1 INT, col2 STRUCT<col3 INT, col4 INT>"
+ override protected val nestedClusteringColumns: Seq[String] =
+ Seq("col2.col3")
+
+ // Hive catalog doesn't support column names with commas.
+ override def excluded: Seq[String] = Seq(
+ s"$command using Hive V1 catalog V1 command: test clustering columns with
comma",
+ s"$command using Hive V1 catalog V2 command: test clustering columns with
comma")
+
+ override def commandVersion: String =
super[CreateTableClusterBySuiteBase].commandVersion
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]