This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e1a5d94 [SPARK-36449][SQL] v2 ALTER TABLE REPLACE COLUMNS should
check duplicates for the user specified columns
e1a5d94 is described below
commit e1a5d9411733437b5a18045bbd18b48f7aa40f46
Author: Terry Kim <[email protected]>
AuthorDate: Tue Aug 10 13:20:29 2021 +0800
[SPARK-36449][SQL] v2 ALTER TABLE REPLACE COLUMNS should check duplicates
for the user specified columns
### What changes were proposed in this pull request?
Currently, v2 ALTER TABLE REPLACE COLUMNS does not check duplicates for the
user specified columns. For example,
```
spark.sql(s"CREATE TABLE $t (id int) USING $v2Format")
spark.sql(s"ALTER TABLE $t REPLACE COLUMNS (data string, data string)")
```
doesn't fail the analysis, and it's up to the catalog implementation to
handle it.
### Why are the changes needed?
To check the duplicate columns during analysis.
### Does this PR introduce _any_ user-facing change?
Yes, now the above will command will print out the following:
```
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the
user specified columns: `data`
```
### How was this patch tested?
Added new unit tests
Closes #33676 from imback82/replace_cols_duplicates.
Authored-by: Terry Kim <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/catalyst/analysis/CheckAnalysis.scala | 25 ++++++++++++++--------
.../spark/sql/connector/AlterTableTests.scala | 11 ++++++++++
.../connector/V2CommandsCaseSensitivitySuite.scala | 14 ++++++++++--
3 files changed, 39 insertions(+), 11 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 9dc5db8..d38327a 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -933,26 +933,33 @@ trait CheckAnalysis extends PredicateHelper with
LookupCatalog {
* Validates the options used for alter table commands after table and
columns are resolved.
*/
private def checkAlterTableCommand(alter: AlterTableCommand): Unit = {
- def checkColumnNotExists(
- op: String, fieldNames: Seq[String], struct: StructType, r: Resolver):
Unit = {
- if (struct.findNestedField(fieldNames, includeCollections = true,
r).isDefined) {
+ def checkColumnNotExists(op: String, fieldNames: Seq[String], struct:
StructType): Unit = {
+ if (struct.findNestedField(
+ fieldNames, includeCollections = true,
alter.conf.resolver).isDefined) {
alter.failAnalysis(s"Cannot $op column, because ${fieldNames.quoted} "
+
s"already exists in ${struct.treeString}")
}
}
+ def checkColumnNameDuplication(colsToAdd: Seq[QualifiedColType]): Unit = {
+ SchemaUtils.checkColumnNameDuplication(
+ colsToAdd.map(_.name.quoted),
+ "in the user specified columns",
+ alter.conf.resolver)
+ }
+
alter match {
case AddColumns(table: ResolvedTable, colsToAdd) =>
colsToAdd.foreach { colToAdd =>
- checkColumnNotExists("add", colToAdd.name, table.schema,
alter.conf.resolver)
+ checkColumnNotExists("add", colToAdd.name, table.schema)
}
- SchemaUtils.checkColumnNameDuplication(
- colsToAdd.map(_.name.quoted),
- "in the user specified columns",
- alter.conf.resolver)
+ checkColumnNameDuplication(colsToAdd)
+
+ case ReplaceColumns(_: ResolvedTable, colsToAdd) =>
+ checkColumnNameDuplication(colsToAdd)
case RenameColumn(table: ResolvedTable, col: ResolvedFieldName, newName)
=>
- checkColumnNotExists("rename", col.path :+ newName, table.schema,
alter.conf.resolver)
+ checkColumnNotExists("rename", col.path :+ newName, table.schema)
case a @ AlterColumn(table: ResolvedTable, col: ResolvedFieldName, _, _,
_, _) =>
val fieldName = col.name.quoted
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
index 1bd45f5..1b0898f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
@@ -1175,4 +1175,15 @@ trait AlterTableTests extends SharedSparkSession {
StructField("col3", IntegerType).withComment("c3"))))
}
}
+
+ test("SPARK-36449: Replacing columns with duplicate name should not be
allowed") {
+ val t = s"${catalogAndNamespace}table_name"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (data string) USING $v2Format")
+ val e = intercept[AnalysisException] {
+ sql(s"ALTER TABLE $t REPLACE COLUMNS (data string, data1 string, data
string)")
+ }
+ assert(e.message.contains("Found duplicate column(s) in the user
specified columns: `data`"))
+ }
+ }
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
index 0cc8d05..f262cf1 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.connector
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest,
CreateTablePartitioningValidationSuite, ResolvedTable, TestRelation2,
TestTable2, UnresolvedFieldName, UnresolvedFieldPosition}
-import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn,
AlterTableCommand, CreateTableAsSelect, DropColumns, LogicalPlan,
QualifiedColType, RenameColumn, ReplaceTableAsSelect}
+import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn,
AlterTableCommand, CreateTableAsSelect, DropColumns, LogicalPlan,
QualifiedColType, RenameColumn, ReplaceColumns, ReplaceTableAsSelect}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
@@ -316,8 +316,18 @@ class V2CommandsCaseSensitivitySuite extends
SharedSparkSession with AnalysisTes
}
}
+ test("SPARK-36449: Replacing columns with duplicate name should not be
allowed") {
+ alterTableTest(
+ ReplaceColumns(
+ table,
+ Seq(QualifiedColType(None, "f", LongType, true, None, None),
+ QualifiedColType(None, "F", LongType, true, None, None))),
+ Seq("Found duplicate column(s) in the user specified columns: `f`"),
+ expectErrorOnCaseSensitive = false)
+ }
+
private def alterTableTest(
- alter: AlterTableCommand,
+ alter: => AlterTableCommand,
error: Seq[String],
expectErrorOnCaseSensitive: Boolean = true): Unit = {
Seq(true, false).foreach { caseSensitive =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]