This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 931ab065df39 [SPARK-48824][SQL] Add Identity Column SQL syntax
931ab065df39 is described below
commit 931ab065df3952487028316ebd49c2895d947bf2
Author: zhipeng.mao <[email protected]>
AuthorDate: Sun Sep 15 13:35:00 2024 +0800
[SPARK-48824][SQL] Add Identity Column SQL syntax
### What changes were proposed in this pull request?
Add SQL support for creating identity columns. Users can specify a column
`GENERATED ALWAYS AS IDENTITY(identityColumnSpec)` , where identity values are
**always** generated by the system, or `GENERATED BY DEFAULT AS
IDENTITY(identityColumnSpec)`, where users can specify the identity values.
Users can optionally specify the starting value of the column (default = 1)
and the increment/step of the column (default = 1). Also we allow both
`START WITH <start> INCREMENT BY <step>`
and
`INCREMENT BY <step> START WITH <start>`
It allows flexible ordering of the increment and starting values, as both
variants are used in the wild by other systems (e.g.
[PostgreSQL](https://www.postgresql.org/docs/current/sql-createsequence.html)
[Oracle](https://docs.oracle.com/en/database/oracle/oracle-database/23/sqlrf/CREATE-SEQUENCE.html#GUID-E9C78A8C-615A-4757-B2A8-5E6EFB130571)).
For example, we can define
```
CREATE TABLE default.example (
id LONG GENERATED ALWAYS AS IDENTITY,
id1 LONG GENERATED ALWAYS AS IDENTITY(),
id2 LONG GENERATED BY DEFAULT AS IDENTITY(START WITH 0),
id3 LONG GENERATED ALWAYS AS IDENTITY(INCREMENT BY 2),
id4 LONG GENERATED BY DEFAULT AS IDENTITY(START WITH 0 INCREMENT BY -10),
id5 LONG GENERATED ALWAYS AS IDENTITY(INCREMENT BY 2 START WITH -8),
value LONG
)
```
This will enable defining identity columns in Spark SQL for data sources
that support it.
To be more specific this PR
- Adds parser support for GENERATED { BY DEFAULT | ALWAYS } AS IDENTITY in
create/replace table statements. Identity column specifications are temporarily
stored in the field's metadata, and then are parsed/verified in
DataSourceV2Strategy and used to instantiate v2 [Column]
- Adds TableCatalog::capabilities() and
TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_IDENTITY_COLUMNS This will be
used to determine whether to allow specifying identity columns or whether to
throw an exception.
### Why are the changes needed?
A SQL API is needed to create Identity Columns.
### Does this PR introduce _any_ user-facing change?
It allows the aforementioned SQL syntax to create identity columns in a
table.
### How was this patch tested?
Positive and negative unit tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #47614 from zhipengmao-db/zhipengmao-db/SPARK-48824-id-syntax.
Authored-by: zhipeng.mao <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 24 +++
docs/sql-ref-ansi-compliance.md | 2 +
.../spark/sql/catalyst/parser/SqlBaseLexer.g4 | 2 +
.../spark/sql/catalyst/parser/SqlBaseParser.g4 | 21 +-
.../sql/connector/catalog/IdentityColumnSpec.java | 88 +++++++++
.../spark/sql/errors/QueryParsingErrors.scala | 19 ++
.../apache/spark/sql/connector/catalog/Column.java | 24 ++-
.../connector/catalog/TableCatalogCapability.java | 20 +-
.../spark/sql/catalyst/parser/AstBuilder.scala | 66 ++++++-
.../catalyst/plans/logical/ColumnDefinition.scala | 68 +++++--
.../spark/sql/catalyst/util/IdentityColumn.scala | 78 ++++++++
.../sql/connector/catalog/CatalogV2Util.scala | 47 +++--
.../spark/sql/internal/connector/ColumnImpl.scala | 3 +-
.../spark/sql/catalyst/parser/DDLParserSuite.scala | 213 ++++++++++++++++++++-
.../connector/catalog/InMemoryTableCatalog.scala | 3 +-
.../execution/datasources/DataSourceStrategy.scala | 7 +-
.../datasources/v2/DataSourceV2Strategy.scala | 5 +-
.../sql-tests/results/ansi/keywords.sql.out | 2 +
.../resources/sql-tests/results/keywords.sql.out | 2 +
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 58 ++++++
.../spark/sql/execution/command/DDLSuite.scala | 11 ++
.../ThriftServerWithSparkContextSuite.scala | 2 +-
22 files changed, 724 insertions(+), 41 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index a6d8550716b9..38472f44fb59 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -1589,6 +1589,30 @@
],
"sqlState" : "42601"
},
+ "IDENTITY_COLUMNS_DUPLICATED_SEQUENCE_GENERATOR_OPTION" : {
+ "message" : [
+ "Duplicated IDENTITY column sequence generator option:
<sequenceGeneratorOption>."
+ ],
+ "sqlState" : "42601"
+ },
+ "IDENTITY_COLUMNS_ILLEGAL_STEP" : {
+ "message" : [
+ "IDENTITY column step cannot be 0."
+ ],
+ "sqlState" : "42611"
+ },
+ "IDENTITY_COLUMNS_UNSUPPORTED_DATA_TYPE" : {
+ "message" : [
+ "DataType <dataType> is not supported for IDENTITY columns."
+ ],
+ "sqlState" : "428H2"
+ },
+ "IDENTITY_COLUMN_WITH_DEFAULT_VALUE" : {
+ "message" : [
+ "A column cannot have both a default value and an identity column
specification but column <colName> has default value: (<defaultValue>) and
identity column specification: (<identityColumnSpec>)."
+ ],
+ "sqlState" : "42623"
+ },
"ILLEGAL_DAY_OF_WEEK" : {
"message" : [
"Illegal input for day of week: <string>."
diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md
index fe5ddf27bf6c..7987e5eb6012 100644
--- a/docs/sql-ref-ansi-compliance.md
+++ b/docs/sql-ref-ansi-compliance.md
@@ -536,12 +536,14 @@ Below is a list of all the keywords in Spark SQL.
|HOUR|non-reserved|non-reserved|non-reserved|
|HOURS|non-reserved|non-reserved|non-reserved|
|IDENTIFIER|non-reserved|non-reserved|non-reserved|
+|IDENTITY|non-reserved|non-reserved|non-reserved|
|IF|non-reserved|non-reserved|not a keyword|
|IGNORE|non-reserved|non-reserved|non-reserved|
|IMMEDIATE|non-reserved|non-reserved|non-reserved|
|IMPORT|non-reserved|non-reserved|non-reserved|
|IN|reserved|non-reserved|reserved|
|INCLUDE|non-reserved|non-reserved|non-reserved|
+|INCREMENT|non-reserved|non-reserved|non-reserved|
|INDEX|non-reserved|non-reserved|non-reserved|
|INDEXES|non-reserved|non-reserved|non-reserved|
|INNER|reserved|strict-non-reserved|reserved|
diff --git
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
index 96a58b99debe..c82ee57a2517 100644
---
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
+++
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
@@ -256,12 +256,14 @@ BINARY_HEX: 'X';
HOUR: 'HOUR';
HOURS: 'HOURS';
IDENTIFIER_KW: 'IDENTIFIER';
+IDENTITY: 'IDENTITY';
IF: 'IF';
IGNORE: 'IGNORE';
IMMEDIATE: 'IMMEDIATE';
IMPORT: 'IMPORT';
IN: 'IN';
INCLUDE: 'INCLUDE';
+INCREMENT: 'INCREMENT';
INDEX: 'INDEX';
INDEXES: 'INDEXES';
INNER: 'INNER';
diff --git
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 3ea408ca4270..1840b6887841 100644
---
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -1297,7 +1297,22 @@ colDefinitionOption
;
generationExpression
- : GENERATED ALWAYS AS LEFT_PAREN expression RIGHT_PAREN
+ : GENERATED ALWAYS AS LEFT_PAREN expression RIGHT_PAREN
#generatedColumn
+ | GENERATED (ALWAYS | BY DEFAULT) AS IDENTITY identityColSpec?
#identityColumn
+ ;
+
+identityColSpec
+ : LEFT_PAREN sequenceGeneratorOption* RIGHT_PAREN
+ ;
+
+sequenceGeneratorOption
+ : START WITH start=sequenceGeneratorStartOrStep
+ | INCREMENT BY step=sequenceGeneratorStartOrStep
+ ;
+
+sequenceGeneratorStartOrStep
+ : MINUS? INTEGER_VALUE
+ | MINUS? BIGINT_LITERAL
;
complexColTypeList
@@ -1591,11 +1606,13 @@ ansiNonReserved
| HOUR
| HOURS
| IDENTIFIER_KW
+ | IDENTITY
| IF
| IGNORE
| IMMEDIATE
| IMPORT
| INCLUDE
+ | INCREMENT
| INDEX
| INDEXES
| INPATH
@@ -1942,12 +1959,14 @@ nonReserved
| HOUR
| HOURS
| IDENTIFIER_KW
+ | IDENTITY
| IF
| IGNORE
| IMMEDIATE
| IMPORT
| IN
| INCLUDE
+ | INCREMENT
| INDEX
| INDEXES
| INPATH
diff --git
a/sql/api/src/main/java/org/apache/spark/sql/connector/catalog/IdentityColumnSpec.java
b/sql/api/src/main/java/org/apache/spark/sql/connector/catalog/IdentityColumnSpec.java
new file mode 100644
index 000000000000..4a8943736bd3
--- /dev/null
+++
b/sql/api/src/main/java/org/apache/spark/sql/connector/catalog/IdentityColumnSpec.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+import org.apache.spark.annotation.Evolving;
+
+import java.util.Objects;
+
+/**
+ * Identity column specification.
+ */
+@Evolving
+public class IdentityColumnSpec {
+ private final long start;
+ private final long step;
+ private final boolean allowExplicitInsert;
+
+ /**
+ * Creates an identity column specification.
+ * @param start the start value to generate the identity values
+ * @param step the step value to generate the identity values
+ * @param allowExplicitInsert whether the identity column allows explicit
insertion of values
+ */
+ public IdentityColumnSpec(long start, long step, boolean
allowExplicitInsert) {
+ this.start = start;
+ this.step = step;
+ this.allowExplicitInsert = allowExplicitInsert;
+ }
+
+ /**
+ * @return the start value to generate the identity values
+ */
+ public long getStart() {
+ return start;
+ }
+
+ /**
+ * @return the step value to generate the identity values
+ */
+ public long getStep() {
+ return step;
+ }
+
+ /**
+ * @return whether the identity column allows explicit insertion of values
+ */
+ public boolean isAllowExplicitInsert() {
+ return allowExplicitInsert;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ IdentityColumnSpec that = (IdentityColumnSpec) o;
+ return start == that.start &&
+ step == that.step &&
+ allowExplicitInsert == that.allowExplicitInsert;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(start, step, allowExplicitInsert);
+ }
+
+ @Override
+ public String toString() {
+ return "IdentityColumnSpec{" +
+ "start=" + start +
+ ", step=" + step +
+ ", allowExplicitInsert=" + allowExplicitInsert +
+ "}";
+ }
+}
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
b/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
index 5f7fcb92f7bd..b19607a28f06 100644
---
a/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
@@ -556,6 +556,25 @@ private[sql] object QueryParsingErrors extends
DataTypeErrorsBase {
ctx)
}
+ def identityColumnUnsupportedDataType(
+ ctx: IdentityColumnContext,
+ dataType: String): Throwable = {
+ new ParseException("IDENTITY_COLUMNS_UNSUPPORTED_DATA_TYPE",
Map("dataType" -> dataType), ctx)
+ }
+
+ def identityColumnIllegalStep(ctx: IdentityColSpecContext): Throwable = {
+ new ParseException("IDENTITY_COLUMNS_ILLEGAL_STEP", Map.empty, ctx)
+ }
+
+ def identityColumnDuplicatedSequenceGeneratorOption(
+ ctx: IdentityColSpecContext,
+ sequenceGeneratorOption: String): Throwable = {
+ new ParseException(
+ "IDENTITY_COLUMNS_DUPLICATED_SEQUENCE_GENERATOR_OPTION",
+ Map("sequenceGeneratorOption" -> sequenceGeneratorOption),
+ ctx)
+ }
+
def createViewWithBothIfNotExistsAndReplaceError(ctx: CreateViewContext):
Throwable = {
new ParseException(errorClass = "_LEGACY_ERROR_TEMP_0052", ctx)
}
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java
index b191438dbc3e..8b32940d7a65 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java
@@ -53,7 +53,7 @@ public interface Column {
boolean nullable,
String comment,
String metadataInJSON) {
- return new ColumnImpl(name, dataType, nullable, comment, null, null,
metadataInJSON);
+ return new ColumnImpl(name, dataType, nullable, comment, null, null, null,
metadataInJSON);
}
static Column create(
@@ -63,7 +63,8 @@ public interface Column {
String comment,
ColumnDefaultValue defaultValue,
String metadataInJSON) {
- return new ColumnImpl(name, dataType, nullable, comment, defaultValue,
null, metadataInJSON);
+ return new ColumnImpl(name, dataType, nullable, comment, defaultValue,
+ null, null, metadataInJSON);
}
static Column create(
@@ -74,7 +75,18 @@ public interface Column {
String generationExpression,
String metadataInJSON) {
return new ColumnImpl(name, dataType, nullable, comment, null,
- generationExpression, metadataInJSON);
+ generationExpression, null, metadataInJSON);
+ }
+
+ static Column create(
+ String name,
+ DataType dataType,
+ boolean nullable,
+ String comment,
+ IdentityColumnSpec identityColumnSpec,
+ String metadataInJSON) {
+ return new ColumnImpl(name, dataType, nullable, comment, null,
+ null, identityColumnSpec, metadataInJSON);
}
/**
@@ -113,6 +125,12 @@ public interface Column {
@Nullable
String generationExpression();
+ /**
+ * Returns the identity column specification of this table column. Null
means no identity column.
+ */
+ @Nullable
+ IdentityColumnSpec identityColumnSpec();
+
/**
* Returns the column metadata in JSON format.
*/
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java
index 5ccb15ff1f0a..dceac1b484cf 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java
@@ -59,5 +59,23 @@ public enum TableCatalogCapability {
* {@link TableCatalog#createTable}.
* See {@link Column#defaultValue()}.
*/
- SUPPORT_COLUMN_DEFAULT_VALUE
+ SUPPORT_COLUMN_DEFAULT_VALUE,
+
+ /**
+ * Signals that the TableCatalog supports defining identity columns upon
table creation in SQL.
+ * <p>
+ * Without this capability, any create/replace table statements with an
identity column defined
+ * in the table schema will throw an exception during analysis.
+ * <p>
+ * An identity column is defined with syntax:
+ * {@code colName colType GENERATED ALWAYS AS IDENTITY(identityColumnSpec)}
+ * or
+ * {@code colName colType GENERATED BY DEFAULT AS
IDENTITY(identityColumnSpec)}
+ * identityColumnSpec is defined with syntax: {@code [START WITH start |
INCREMENT BY step]*}
+ * <p>
+ * IdentitySpec is included in the column definition for APIs like
+ * {@link TableCatalog#createTable}.
+ * See {@link Column#identityColumnSpec()}.
+ */
+ SUPPORTS_CREATE_TABLE_WITH_IDENTITY_COLUMNS
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index edcb417da123..cb0e0e35c370 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -45,7 +45,7 @@ import
org.apache.spark.sql.catalyst.trees.TreePattern.PARAMETER
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils,
IntervalUtils}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate,
convertSpecialTimestamp, convertSpecialTimestampNTZ, getZoneId, stringToDate,
stringToTimestamp, stringToTimestampWithoutTimeZone}
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util,
SupportsNamespaces, TableCatalog, TableWritePrivilege}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util,
IdentityColumnSpec, SupportsNamespaces, TableCatalog, TableWritePrivilege}
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
import org.apache.spark.sql.connector.expressions.{ApplyTransform,
BucketTransform, DaysTransform, Expression => V2Expression, FieldReference,
HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform,
YearsTransform}
import org.apache.spark.sql.errors.{DataTypeErrorsBase,
QueryCompilationErrors, QueryParsingErrors, SqlScriptingErrors}
@@ -3619,13 +3619,19 @@ class AstBuilder extends DataTypeAstBuilder
}
}
+ val dataType = typedVisit[DataType](ctx.dataType)
ColumnDefinition(
name = name,
- dataType = typedVisit[DataType](ctx.dataType),
+ dataType = dataType,
nullable = nullable,
comment = commentSpec.map(visitCommentSpec),
defaultValue = defaultExpression.map(visitDefaultExpression),
- generationExpression =
generationExpression.map(visitGenerationExpression)
+ generationExpression = generationExpression.collect {
+ case ctx: GeneratedColumnContext => visitGeneratedColumn(ctx)
+ },
+ identityColumnSpec = generationExpression.collect {
+ case ctx: IdentityColumnContext => visitIdentityColumn(ctx, dataType)
+ }
)
}
@@ -3681,11 +3687,63 @@ class AstBuilder extends DataTypeAstBuilder
/**
* Create a generation expression string.
*/
- override def visitGenerationExpression(ctx: GenerationExpressionContext):
String =
+ override def visitGeneratedColumn(ctx: GeneratedColumnContext): String =
withOrigin(ctx) {
getDefaultExpression(ctx.expression(), "GENERATED").originalSQL
}
+ /**
+ * Parse and verify IDENTITY column definition.
+ *
+ * @param ctx The parser context.
+ * @param dataType The data type of column defined as IDENTITY column. Used
for verification.
+ * @return Tuple containing start, step and allowExplicitInsert.
+ */
+ protected def visitIdentityColumn(
+ ctx: IdentityColumnContext,
+ dataType: DataType): IdentityColumnSpec = {
+ if (dataType != LongType && dataType != IntegerType) {
+ throw QueryParsingErrors.identityColumnUnsupportedDataType(ctx,
dataType.toString)
+ }
+ // We support two flavors of syntax:
+ // (1) GENERATED ALWAYS AS IDENTITY (...)
+ // (2) GENERATED BY DEFAULT AS IDENTITY (...)
+ // (1) forbids explicit inserts, while (2) allows.
+ val allowExplicitInsert = ctx.BY() != null && ctx.DEFAULT() != null
+ val (start, step) = visitIdentityColSpec(ctx.identityColSpec())
+
+ new IdentityColumnSpec(start, step, allowExplicitInsert)
+ }
+
+ override def visitIdentityColSpec(ctx: IdentityColSpecContext): (Long, Long)
= {
+ val defaultStart = 1
+ val defaultStep = 1
+ if (ctx == null) {
+ return (defaultStart, defaultStep)
+ }
+ var (start, step): (Option[Long], Option[Long]) = (None, None)
+ ctx.sequenceGeneratorOption().asScala.foreach { option =>
+ if (option.start != null) {
+ if (start.isDefined) {
+ throw
QueryParsingErrors.identityColumnDuplicatedSequenceGeneratorOption(ctx, "START")
+ }
+ start = Some(option.start.getText.toLong)
+ } else if (option.step != null) {
+ if (step.isDefined) {
+ throw
QueryParsingErrors.identityColumnDuplicatedSequenceGeneratorOption(ctx, "STEP")
+ }
+ step = Some(option.step.getText.toLong)
+ if (step.get == 0L) {
+ throw QueryParsingErrors.identityColumnIllegalStep(ctx)
+ }
+ } else {
+ throw SparkException
+ .internalError(s"Invalid identity column sequence generator
option: ${option.getText}")
+ }
+ }
+ (start.getOrElse(defaultStart), step.getOrElse(defaultStep))
+ }
+
/**
* Create an optional comment string.
*/
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala
index 83e50aa33c70..043214711ccf 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala
@@ -21,10 +21,10 @@ import org.apache.spark.SparkException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal,
UnaryExpression, Unevaluable}
import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.util.GeneratedColumn
+import org.apache.spark.sql.catalyst.util.{GeneratedColumn, IdentityColumn}
import
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.validateDefaultValueExpr
import
org.apache.spark.sql.catalyst.util.ResolveDefaultColumnsUtils.{CURRENT_DEFAULT_COLUMN_METADATA_KEY,
EXISTS_DEFAULT_COLUMN_METADATA_KEY}
-import org.apache.spark.sql.connector.catalog.{Column => V2Column,
ColumnDefaultValue}
+import org.apache.spark.sql.connector.catalog.{Column => V2Column,
ColumnDefaultValue, IdentityColumnSpec}
import org.apache.spark.sql.connector.expressions.LiteralValue
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.connector.ColumnImpl
@@ -41,7 +41,11 @@ case class ColumnDefinition(
comment: Option[String] = None,
defaultValue: Option[DefaultValueExpression] = None,
generationExpression: Option[String] = None,
+ identityColumnSpec: Option[IdentityColumnSpec] = None,
metadata: Metadata = Metadata.empty) extends Expression with Unevaluable {
+ assert(
+ generationExpression.isEmpty || identityColumnSpec.isEmpty,
+ "A ColumnDefinition cannot contain both a generation expression and an
identity column spec.")
override def children: Seq[Expression] = defaultValue.toSeq
@@ -58,6 +62,7 @@ case class ColumnDefinition(
comment.orNull,
defaultValue.map(_.toV2(statement, name)).orNull,
generationExpression.orNull,
+ identityColumnSpec.orNull,
if (metadata == Metadata.empty) null else metadata.json)
}
@@ -75,8 +80,19 @@ case class ColumnDefinition(
generationExpression.foreach { generationExpr =>
metadataBuilder.putString(GeneratedColumn.GENERATION_EXPRESSION_METADATA_KEY,
generationExpr)
}
+ encodeIdentityColumnSpec(metadataBuilder)
StructField(name, dataType, nullable, metadataBuilder.build())
}
+
+ private def encodeIdentityColumnSpec(metadataBuilder: MetadataBuilder): Unit
= {
+ identityColumnSpec.foreach { spec: IdentityColumnSpec =>
+ metadataBuilder.putLong(IdentityColumn.IDENTITY_INFO_START,
spec.getStart)
+ metadataBuilder.putLong(IdentityColumn.IDENTITY_INFO_STEP, spec.getStep)
+ metadataBuilder.putBoolean(
+ IdentityColumn.IDENTITY_INFO_ALLOW_EXPLICIT_INSERT,
+ spec.isAllowExplicitInsert)
+ }
+ }
}
object ColumnDefinition {
@@ -87,6 +103,9 @@ object ColumnDefinition {
metadataBuilder.remove(CURRENT_DEFAULT_COLUMN_METADATA_KEY)
metadataBuilder.remove(EXISTS_DEFAULT_COLUMN_METADATA_KEY)
metadataBuilder.remove(GeneratedColumn.GENERATION_EXPRESSION_METADATA_KEY)
+ metadataBuilder.remove(IdentityColumn.IDENTITY_INFO_START)
+ metadataBuilder.remove(IdentityColumn.IDENTITY_INFO_STEP)
+ metadataBuilder.remove(IdentityColumn.IDENTITY_INFO_ALLOW_EXPLICIT_INSERT)
val hasDefaultValue = col.getCurrentDefaultValue().isDefined &&
col.getExistenceDefaultValue().isDefined
@@ -97,6 +116,15 @@ object ColumnDefinition {
None
}
val generationExpr = GeneratedColumn.getGenerationExpression(col)
+ val identityColumnSpec = if
(col.metadata.contains(IdentityColumn.IDENTITY_INFO_START)) {
+ Some(new IdentityColumnSpec(
+ col.metadata.getLong(IdentityColumn.IDENTITY_INFO_START),
+ col.metadata.getLong(IdentityColumn.IDENTITY_INFO_STEP),
+
col.metadata.getBoolean(IdentityColumn.IDENTITY_INFO_ALLOW_EXPLICIT_INSERT)
+ ))
+ } else {
+ None
+ }
ColumnDefinition(
col.name,
col.dataType,
@@ -104,6 +132,7 @@ object ColumnDefinition {
col.getComment(),
defaultValue,
generationExpr,
+ identityColumnSpec,
metadataBuilder.build()
)
}
@@ -124,18 +153,8 @@ object ColumnDefinition {
s"Command $cmd should not have column default value expression.")
}
cmd.columns.foreach { col =>
- if (col.defaultValue.isDefined &&
col.generationExpression.isDefined) {
- throw new AnalysisException(
- errorClass = "GENERATED_COLUMN_WITH_DEFAULT_VALUE",
- messageParameters = Map(
- "colName" -> col.name,
- "defaultValue" -> col.defaultValue.get.originalSQL,
- "genExpr" -> col.generationExpression.get
- )
- )
- }
-
col.defaultValue.foreach { default =>
+ checkDefaultColumnConflicts(col)
validateDefaultValueExpr(default, statement, col.name,
col.dataType)
}
}
@@ -143,6 +162,29 @@ object ColumnDefinition {
case _ =>
}
}
+
+ private def checkDefaultColumnConflicts(col: ColumnDefinition): Unit = {
+ if (col.generationExpression.isDefined) {
+ throw new AnalysisException(
+ errorClass = "GENERATED_COLUMN_WITH_DEFAULT_VALUE",
+ messageParameters = Map(
+ "colName" -> col.name,
+ "defaultValue" -> col.defaultValue.get.originalSQL,
+ "genExpr" -> col.generationExpression.get
+ )
+ )
+ }
+ if (col.identityColumnSpec.isDefined) {
+ throw new AnalysisException(
+ errorClass = "IDENTITY_COLUMN_WITH_DEFAULT_VALUE",
+ messageParameters = Map(
+ "colName" -> col.name,
+ "defaultValue" -> col.defaultValue.get.originalSQL,
+ "identityColumnSpec" -> col.identityColumnSpec.get.toString
+ )
+ )
+ }
+ }
}
/**
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IdentityColumn.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IdentityColumn.scala
new file mode 100644
index 000000000000..26a3cb026d31
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IdentityColumn.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.connector.catalog.{Identifier, IdentityColumnSpec,
TableCatalog, TableCatalogCapability}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types.{StructField, StructType}
+
+/**
+ * This object contains utility methods and values for Identity Columns
+ */
+object IdentityColumn {
+ val IDENTITY_INFO_START = "identity.start"
+ val IDENTITY_INFO_STEP = "identity.step"
+ val IDENTITY_INFO_ALLOW_EXPLICIT_INSERT = "identity.allowExplicitInsert"
+
+ /**
+ * If `schema` contains any generated columns, check whether the table
catalog supports identity
+ * columns. Otherwise throw an error.
+ */
+ def validateIdentityColumn(
+ schema: StructType,
+ catalog: TableCatalog,
+ ident: Identifier): Unit = {
+ if (hasIdentityColumns(schema)) {
+ if (!catalog
+ .capabilities()
+
.contains(TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_IDENTITY_COLUMNS)) {
+ throw QueryCompilationErrors.unsupportedTableOperationError(
+ catalog, ident, operation = "identity column"
+ )
+ }
+ }
+ }
+
+ /**
+ * Whether the given `field` is an identity column
+ */
+ def isIdentityColumn(field: StructField): Boolean = {
+ field.metadata.contains(IDENTITY_INFO_START)
+ }
+
+ /**
+ * Returns the identity information stored in the column metadata if it
exists
+ */
+ def getIdentityInfo(field: StructField): Option[IdentityColumnSpec] = {
+ if (isIdentityColumn(field)) {
+ Some(new IdentityColumnSpec(
+ field.metadata.getString(IDENTITY_INFO_START).toLong,
+ field.metadata.getString(IDENTITY_INFO_STEP).toLong,
+
field.metadata.getString(IDENTITY_INFO_ALLOW_EXPLICIT_INSERT).toBoolean))
+ } else {
+ None
+ }
+ }
+
+ /**
+ * Whether the `schema` has one or more identity columns
+ */
+ def hasIdentityColumns(schema: StructType): Boolean = {
+ schema.exists(isIdentityColumn)
+ }
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index 6698f0a02140..9b7f68070a1a 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp,
AsOfVersion, Named
import org.apache.spark.sql.catalyst.catalog.ClusterBySpec
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec}
-import org.apache.spark.sql.catalyst.util.GeneratedColumn
+import org.apache.spark.sql.catalyst.util.{GeneratedColumn, IdentityColumn}
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
@@ -579,18 +579,10 @@ private[sql] object CatalogV2Util {
val isDefaultColumn = f.getCurrentDefaultValue().isDefined &&
f.getExistenceDefaultValue().isDefined
val isGeneratedColumn = GeneratedColumn.isGeneratedColumn(f)
- if (isDefaultColumn && isGeneratedColumn) {
- throw new AnalysisException(
- errorClass = "GENERATED_COLUMN_WITH_DEFAULT_VALUE",
- messageParameters = Map(
- "colName" -> f.name,
- "defaultValue" -> f.getCurrentDefaultValue().get,
- "genExpr" -> GeneratedColumn.getGenerationExpression(f).get
- )
- )
- }
-
+ val isIdentityColumn = IdentityColumn.isIdentityColumn(f)
if (isDefaultColumn) {
+ checkDefaultColumnConflicts(f)
+
val e = analyze(
f,
statementType = "Column analysis",
@@ -611,10 +603,41 @@ private[sql] object CatalogV2Util {
Seq("comment", GeneratedColumn.GENERATION_EXPRESSION_METADATA_KEY))
Column.create(f.name, f.dataType, f.nullable, f.getComment().orNull,
GeneratedColumn.getGenerationExpression(f).get,
metadataAsJson(cleanedMetadata))
+ } else if (isIdentityColumn) {
+ val cleanedMetadata = metadataWithKeysRemoved(
+ Seq("comment",
+ IdentityColumn.IDENTITY_INFO_START,
+ IdentityColumn.IDENTITY_INFO_STEP,
+ IdentityColumn.IDENTITY_INFO_ALLOW_EXPLICIT_INSERT))
+ Column.create(f.name, f.dataType, f.nullable, f.getComment().orNull,
+ IdentityColumn.getIdentityInfo(f).get,
metadataAsJson(cleanedMetadata))
} else {
val cleanedMetadata = metadataWithKeysRemoved(Seq("comment"))
Column.create(f.name, f.dataType, f.nullable, f.getComment().orNull,
metadataAsJson(cleanedMetadata))
}
}
+
+ private def checkDefaultColumnConflicts(f: StructField): Unit = {
+ if (GeneratedColumn.isGeneratedColumn(f)) {
+ throw new AnalysisException(
+ errorClass = "GENERATED_COLUMN_WITH_DEFAULT_VALUE",
+ messageParameters = Map(
+ "colName" -> f.name,
+ "defaultValue" -> f.getCurrentDefaultValue().get,
+ "genExpr" -> GeneratedColumn.getGenerationExpression(f).get
+ )
+ )
+ }
+ if (IdentityColumn.isIdentityColumn(f)) {
+ throw new AnalysisException(
+ errorClass = "IDENTITY_COLUMN_WITH_DEFAULT_VALUE",
+ messageParameters = Map(
+ "colName" -> f.name,
+ "defaultValue" -> f.getCurrentDefaultValue().get,
+ "identityColumnSpec" ->
IdentityColumn.getIdentityInfo(f).get.toString
+ )
+ )
+ }
+ }
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala
index 2a67ffc4bbef..47889410561e 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.internal.connector
-import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue}
+import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue,
IdentityColumnSpec}
import org.apache.spark.sql.types.DataType
// The standard concrete implementation of data source V2 column.
@@ -28,4 +28,5 @@ case class ColumnImpl(
comment: String,
defaultValue: ColumnDefaultValue,
generationExpression: String,
+ identityColumnSpec: IdentityColumnSpec,
metadataInJSON: String) extends Column
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index 0f2bb791f346..b7e2490b552c 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -20,14 +20,16 @@ package org.apache.spark.sql.catalyst.parser
import java.util.Locale
import org.apache.spark.SparkThrowable
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Hex, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.connector.catalog.IdentityColumnSpec
import
org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition.{after, first}
import org.apache.spark.sql.connector.expressions.{ApplyTransform,
BucketTransform, ClusterByTransform, DaysTransform, FieldReference,
HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform,
YearsTransform}
import org.apache.spark.sql.connector.expressions.LogicalExpressions.bucket
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{Decimal, IntegerType, LongType, StringType,
StructType, TimestampType}
+import org.apache.spark.sql.types.{DataType, Decimal, IntegerType, LongType,
StringType, StructType, TimestampType}
import org.apache.spark.storage.StorageLevelMapper
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
@@ -2856,10 +2858,217 @@ class DDLParserSuite extends AnalysisTest {
exception = parseException(
"CREATE TABLE my_tab(a INT, b INT GENERATED ALWAYS AS a + 1) USING
PARQUET"),
condition = "PARSE_SYNTAX_ERROR",
- parameters = Map("error" -> "'a'", "hint" -> ": missing '('")
+ parameters = Map("error" -> "'a'", "hint" -> "")
)
}
+ test("SPARK-48824: implement parser support for " +
+ "GENERATED ALWAYS/BY DEFAULT AS IDENTITY columns in tables ") {
+ def parseAndCompareIdentityColumnPlan(
+ identityColumnDataTypeStr: String,
+ identityColumnDefStr: String,
+ identityColumnSpecStr: String,
+ expectedDataType: DataType,
+ expectedStart: Long,
+ expectedStep: Long,
+ expectedAllowExplicitInsert: Boolean): Unit = {
+ val columnsWithIdentitySpec = Seq(
+ ColumnDefinition(
+ name = "id",
+ dataType = expectedDataType,
+ nullable = true,
+ identityColumnSpec = Some(
+ new IdentityColumnSpec(
+ expectedStart,
+ expectedStep,
+ expectedAllowExplicitInsert
+ )
+ )
+ ),
+ ColumnDefinition("val", IntegerType)
+ )
+ comparePlans(
+ parsePlan(
+ s"CREATE TABLE my_tab(id $identityColumnDataTypeStr GENERATED
$identityColumnDefStr" +
+ s" AS IDENTITY $identityColumnSpecStr, val INT) USING parquet"
+ ),
+ CreateTable(
+ UnresolvedIdentifier(Seq("my_tab")),
+ columnsWithIdentitySpec,
+ Seq.empty[Transform],
+ UnresolvedTableSpec(
+ Map.empty[String, String],
+ Some("parquet"),
+ OptionList(Seq.empty),
+ None,
+ None,
+ None,
+ false
+ ),
+ false
+ )
+ )
+
+ comparePlans(
+ parsePlan(
+ s"REPLACE TABLE my_tab(id $identityColumnDataTypeStr GENERATED
$identityColumnDefStr" +
+ s" AS IDENTITY $identityColumnSpecStr, val INT) USING parquet"
+ ),
+ ReplaceTable(
+ UnresolvedIdentifier(Seq("my_tab")),
+ columnsWithIdentitySpec,
+ Seq.empty[Transform],
+ UnresolvedTableSpec(
+ Map.empty[String, String],
+ Some("parquet"),
+ OptionList(Seq.empty),
+ None,
+ None,
+ None,
+ false
+ ),
+ false
+ )
+ )
+ }
+ for {
+ identityColumnDefStr <- Seq("BY DEFAULT", "ALWAYS")
+ identityColumnDataTypeStr <- Seq("BIGINT", "INT")
+ } {
+ val expectedAllowExplicitInsert = identityColumnDefStr == "BY DEFAULT"
+ val expectedDataType = identityColumnDataTypeStr match {
+ case "BIGINT" => LongType
+ case "INT" => IntegerType
+ }
+ parseAndCompareIdentityColumnPlan(
+ identityColumnDataTypeStr,
+ identityColumnDefStr,
+ "(START WITH 2 INCREMENT BY 2)",
+ expectedDataType,
+ expectedStart = 2,
+ expectedStep = 2,
+ expectedAllowExplicitInsert = expectedAllowExplicitInsert)
+ parseAndCompareIdentityColumnPlan(
+ identityColumnDataTypeStr,
+ identityColumnDefStr,
+ "(START WITH -2 INCREMENT BY -2)",
+ expectedDataType,
+ expectedStart = -2,
+ expectedStep = -2,
+ expectedAllowExplicitInsert = expectedAllowExplicitInsert)
+ parseAndCompareIdentityColumnPlan(
+ identityColumnDataTypeStr,
+ identityColumnDefStr,
+ "(START WITH 2)",
+ expectedDataType,
+ expectedStart = 2,
+ expectedStep = 1,
+ expectedAllowExplicitInsert = expectedAllowExplicitInsert)
+ parseAndCompareIdentityColumnPlan(
+ identityColumnDataTypeStr,
+ identityColumnDefStr,
+ "(START WITH -2)",
+ expectedDataType,
+ expectedStart = -2,
+ expectedStep = 1,
+ expectedAllowExplicitInsert = expectedAllowExplicitInsert)
+ parseAndCompareIdentityColumnPlan(
+ identityColumnDataTypeStr,
+ identityColumnDefStr,
+ "(INCREMENT BY 2)",
+ expectedDataType,
+ expectedStart = 1,
+ expectedStep = 2,
+ expectedAllowExplicitInsert = expectedAllowExplicitInsert)
+ parseAndCompareIdentityColumnPlan(
+ identityColumnDataTypeStr,
+ identityColumnDefStr,
+ "(INCREMENT BY -2)",
+ expectedDataType,
+ expectedStart = 1,
+ expectedStep = -2,
+ expectedAllowExplicitInsert = expectedAllowExplicitInsert)
+ parseAndCompareIdentityColumnPlan(
+ identityColumnDataTypeStr,
+ identityColumnDefStr,
+ "()",
+ expectedDataType,
+ expectedStart = 1,
+ expectedStep = 1,
+ expectedAllowExplicitInsert = expectedAllowExplicitInsert)
+ parseAndCompareIdentityColumnPlan(
+ identityColumnDataTypeStr,
+ identityColumnDefStr,
+ "",
+ expectedDataType,
+ expectedStart = 1,
+ expectedStep = 1,
+ expectedAllowExplicitInsert = expectedAllowExplicitInsert)
+ }
+ }
+
+ test("SPARK-48824: Column cannot have both a generation expression and an
identity column spec") {
+ checkError(
+ exception = intercept[AnalysisException] {
+ parsePlan(s"CREATE TABLE testcat.my_tab(id BIGINT GENERATED ALWAYS AS
1" +
+ s" GENERATED ALWAYS AS IDENTITY, val INT) USING foo")
+ },
+ condition = "PARSE_SYNTAX_ERROR",
+ parameters = Map("error" -> "'1'", "hint" -> "")
+ )
+ }
+
+ test("SPARK-48824: Identity column step must not be zero") {
+ checkError(
+ exception = intercept[ParseException] {
+ parsePlan(
+ s"CREATE TABLE testcat.my_tab" +
+ s"(id BIGINT GENERATED ALWAYS AS IDENTITY(INCREMENT BY 0), val
INT) USING foo"
+ )
+ },
+ condition = "IDENTITY_COLUMNS_ILLEGAL_STEP",
+ parameters = Map.empty,
+ context = ExpectedContext(
+ fragment = "id BIGINT GENERATED ALWAYS AS IDENTITY(INCREMENT BY 0)",
+ start = 28,
+ stop = 81)
+ )
+ }
+
+ test("SPARK-48824: Identity column datatype must be long or integer") {
+ checkError(
+ exception = intercept[ParseException] {
+ parsePlan(
+ s"CREATE TABLE testcat.my_tab(id FLOAT GENERATED ALWAYS AS
IDENTITY(), val INT) USING foo"
+ )
+ },
+ condition = "IDENTITY_COLUMNS_UNSUPPORTED_DATA_TYPE",
+ parameters = Map("dataType" -> "FloatType"),
+ context =
+ ExpectedContext(fragment = "id FLOAT GENERATED ALWAYS AS IDENTITY()",
start = 28, stop = 66)
+ )
+ }
+
+ test("SPARK-48824: Identity column sequence generator option cannot be
duplicated") {
+ val identityColumnSpecStrs = Seq(
+ "(START WITH 0 START WITH 1)",
+ "(INCREMENT BY 1 INCREMENT BY 2)",
+ "(START WITH 0 INCREMENT BY 1 START WITH 1)",
+ "(INCREMENT BY 1 START WITH 0 INCREMENT BY 2)"
+ )
+ for {
+ identitySpecStr <- identityColumnSpecStrs
+ } {
+ val exception = intercept[ParseException] {
+ parsePlan(
+ s"CREATE TABLE testcat.my_tab" +
+ s"(id BIGINT GENERATED ALWAYS AS IDENTITY $identitySpecStr, val
INT) USING foo"
+ )
+ }
+ assert(exception.getErrorClass ===
"IDENTITY_COLUMNS_DUPLICATED_SEQUENCE_GENERATOR_OPTION")
+ }
+ }
+
test("SPARK-42681: Relax ordering constraint for ALTER TABLE ADD COLUMN
options") {
// Positive test cases to verify that column definition options could be
applied in any order.
val expectedPlan = AddColumns(
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
index 982de88e5884..56ed3bb243e1 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
@@ -167,7 +167,8 @@ class InMemoryTableCatalog extends
BasicInMemoryTableCatalog with SupportsNamesp
override def capabilities: java.util.Set[TableCatalogCapability] = {
Set(
TableCatalogCapability.SUPPORT_COLUMN_DEFAULT_VALUE,
- TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS
+ TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS,
+ TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_IDENTITY_COLUMNS
).asJava
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 1dd2659a1b16..2be4b236872f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -40,7 +40,7 @@ import
org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoDir, I
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.catalyst.types.DataTypeUtils
-import org.apache.spark.sql.catalyst.util.{GeneratedColumn,
ResolveDefaultColumns, V2ExpressionBuilder}
+import org.apache.spark.sql.catalyst.util.{GeneratedColumn, IdentityColumn,
ResolveDefaultColumns, V2ExpressionBuilder}
import org.apache.spark.sql.connector.catalog.{SupportsRead, V1Table}
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.connector.expressions.{Expression => V2Expression,
NullOrdering, SortDirection, SortOrder => V2SortOrder, SortValue}
@@ -146,6 +146,11 @@ object DataSourceAnalysis extends Rule[LogicalPlan] {
tableDesc.identifier, "generated columns")
}
+ if (IdentityColumn.hasIdentityColumns(newSchema)) {
+ throw QueryCompilationErrors.unsupportedTableOperationError(
+ tableDesc.identifier, "identity columns")
+ }
+
val newTableDesc = tableDesc.copy(schema = newSchema)
CreateDataSourceTableCommand(newTableDesc, ignoreIfExists = mode ==
SaveMode.Ignore)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 112ee2c5450b..d7f46c32f99a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -32,7 +32,8 @@ import org.apache.spark.sql.catalyst.expressions.{And,
Attribute, DynamicPruning
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.util.{toPrettySQL, GeneratedColumn,
ResolveDefaultColumns, V2ExpressionBuilder}
+import org.apache.spark.sql.catalyst.util.{toPrettySQL, GeneratedColumn,
+ IdentityColumn, ResolveDefaultColumns, V2ExpressionBuilder}
import org.apache.spark.sql.connector.catalog.{Identifier,
StagingTableCatalog, SupportsDeleteV2, SupportsNamespaces,
SupportsPartitionManagement, SupportsWrite, Table, TableCapability,
TableCatalog, TruncatableTable}
import org.apache.spark.sql.connector.catalog.index.SupportsIndex
import org.apache.spark.sql.connector.expressions.{FieldReference,
LiteralValue}
@@ -185,6 +186,7 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
val statementType = "CREATE TABLE"
GeneratedColumn.validateGeneratedColumns(
c.tableSchema, catalog.asTableCatalog, ident, statementType)
+ IdentityColumn.validateIdentityColumn(c.tableSchema,
catalog.asTableCatalog, ident)
CreateTableExec(
catalog.asTableCatalog,
@@ -214,6 +216,7 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
val statementType = "REPLACE TABLE"
GeneratedColumn.validateGeneratedColumns(
c.tableSchema, catalog.asTableCatalog, ident, statementType)
+ IdentityColumn.validateIdentityColumn(c.tableSchema,
catalog.asTableCatalog, ident)
val v2Columns = columns.map(_.toV2Column(statementType)).toArray
catalog match {
diff --git
a/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out
b/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out
index 81ccc0f9efc1..b464427d379a 100644
--- a/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out
@@ -142,6 +142,7 @@ HAVING true
HOUR false
HOURS false
IDENTIFIER false
+IDENTITY false
IF false
IGNORE false
ILIKE false
@@ -149,6 +150,7 @@ IMMEDIATE false
IMPORT false
IN true
INCLUDE false
+INCREMENT false
INDEX false
INDEXES false
INNER true
diff --git a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
index e145c57332eb..16436d7a722c 100644
--- a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
@@ -142,6 +142,7 @@ HAVING false
HOUR false
HOURS false
IDENTIFIER false
+IDENTITY false
IF false
IGNORE false
ILIKE false
@@ -149,6 +150,7 @@ IMMEDIATE false
IMPORT false
IN false
INCLUDE false
+INCREMENT false
INDEX false
INDEXES false
INNER false
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 998d459cd436..5df7b62cfb28 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -1753,6 +1753,64 @@ class DataSourceV2SQLSuiteV1Filter
}
}
+ test("SPARK-48824: Column cannot have both an identity column spec and a
default value") {
+ val tblName = "my_tab"
+ val tableDefinition =
+ s"$tblName(id BIGINT GENERATED ALWAYS AS IDENTITY DEFAULT 0, name
STRING)"
+ withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> "foo") {
+ for (statement <- Seq("CREATE TABLE", "REPLACE TABLE")) {
+ withTable(s"testcat.$tblName") {
+ if (statement == "REPLACE TABLE") {
+ sql(s"CREATE TABLE testcat.$tblName(a INT) USING foo")
+ }
+ checkError(
+ exception = intercept[AnalysisException] {
+ sql(s"$statement testcat.$tableDefinition USING foo")
+ },
+ condition = "IDENTITY_COLUMN_WITH_DEFAULT_VALUE",
+ parameters = Map(
+ "colName" -> "id",
+ "defaultValue" -> "0",
+ "identityColumnSpec" ->
+ "IdentityColumnSpec{start=1, step=1,
allowExplicitInsert=false}")
+ )
+ }
+ }
+ }
+ }
+
+ test("SPARK-48824: Identity columns only allowed with TableCatalogs that " +
+ "SUPPORTS_CREATE_TABLE_WITH_IDENTITY_COLUMNS") {
+ val tblName = "my_tab"
+ val tableDefinition =
+ s"$tblName(id BIGINT GENERATED ALWAYS AS IDENTITY(), val INT)"
+ for (statement <- Seq("CREATE TABLE", "REPLACE TABLE")) {
+ // InMemoryTableCatalog.capabilities() =
{SUPPORTS_CREATE_TABLE_WITH_IDENTITY_COLUMNS}
+ withTable(s"testcat.$tblName") {
+ if (statement == "REPLACE TABLE") {
+ sql(s"CREATE TABLE testcat.$tblName(a INT) USING foo")
+ }
+ // Can create table with an identity column
+ sql(s"$statement testcat.$tableDefinition USING foo")
+
assert(catalog("testcat").asTableCatalog.tableExists(Identifier.of(Array(),
tblName)))
+ }
+ // BasicInMemoryTableCatalog.capabilities() = {}
+ withSQLConf("spark.sql.catalog.dummy" ->
classOf[BasicInMemoryTableCatalog].getName) {
+ checkError(
+ exception = intercept[AnalysisException] {
+ sql("USE dummy")
+ sql(s"$statement dummy.$tableDefinition USING foo")
+ },
+ condition = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
+ parameters = Map(
+ "tableName" -> "`dummy`.`my_tab`",
+ "operation" -> "identity column"
+ )
+ )
+ }
+ }
+ }
+
test("SPARK-46972: asymmetrical replacement for char/varchar in
V2SessionCatalog.createTable") {
// unset this config to use the default v2 session catalog.
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 6e58b0e62ed6..8307326f17fc 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -2288,6 +2288,17 @@ abstract class DDLSuite extends QueryTest with
DDLSuiteBase {
)
}
+ test("SPARK-48824: No identity columns with V1") {
+ checkError(
+ exception = intercept[AnalysisException] {
+ sql(s"create table t(a int, b bigint generated always as identity())
using parquet")
+ },
+ condition = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
+ parameters = Map("tableName" -> "`spark_catalog`.`default`.`t`",
+ "operation" -> "identity columns")
+ )
+ }
+
test("SPARK-44837: Error when altering partition column in non-delta table")
{
withTable("t") {
sql("CREATE TABLE t(i INT, j INT, k INT) USING parquet PARTITIONED BY
(i, j)")
diff --git
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
index edef6371be8a..5b8ee4ea9714 100644
---
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
+++
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
@@ -214,7 +214,7 @@ trait ThriftServerWithSparkContextSuite extends
SharedThriftServer {
val sessionHandle = client.openSession(user, "")
val infoValue = client.getInfo(sessionHandle,
GetInfoType.CLI_ODBC_KEYWORDS)
// scalastyle:off line.size.limit
- assert(infoValue.getStringValue ==
"ADD,AFTER,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,ARCHIVE,ARRAY,AS,ASC,AT,AUTHORIZATION,BEGIN,BETWEEN,BIGINT,BINARY,BINDING,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CALLED,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHAR,CHARACTER,CHECK,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLATION,COLLATIONS,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONSTRAINT,CONTAINS,COST,CREATE,CROSS,CUBE,CURRENT,
[...]
+ assert(infoValue.getStringValue ==
"ADD,AFTER,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,ARCHIVE,ARRAY,AS,ASC,AT,AUTHORIZATION,BEGIN,BETWEEN,BIGINT,BINARY,BINDING,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CALLED,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHAR,CHARACTER,CHECK,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLATION,COLLATIONS,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONSTRAINT,CONTAINS,COST,CREATE,CROSS,CUBE,CURRENT,
[...]
// scalastyle:on line.size.limit
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]