This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 69e5ba3b56d9 [SPARK-54971] Add WITH SCHEMA EVOLUTION syntax for SQL
INSERT
69e5ba3b56d9 is described below
commit 69e5ba3b56d99a7c298eff93b62f85a73ad16ae6
Author: Thang Long VU <[email protected]>
AuthorDate: Tue Jan 13 18:53:27 2026 +0800
[SPARK-54971] Add WITH SCHEMA EVOLUTION syntax for SQL INSERT
### What changes were proposed in this pull request?
Similar to the [MERGE WITH SCHEMA EVOLUTION
PR](https://github.com/apache/spark/pull/45748), **this PR introduces a syntax
`WITH SCHEMA EVOLUTION` to the SQL `INSERT` command.** Since this syntax is not
fully implemented for any table formats yet, **users will receive an exception
if they try to use it.**
When `WITH SCHEMA EVOLUTION` is specified, schema evolution-related
features must be turned on for this single statement and only in this statement.
**In this PR, Spark is only responsible for recognizing the existence or
absence of the syntax WITH SCHEMA EVOLUTION**, and the recognition info is
passed down from the `Analyzer`. When `WITH SCHEMA EVOLUTION` is detected,
Spark sets the `mergeSchema` write option to `true` in the respective V2 Insert
Command nodes.
Data sources must respect the syntax and give appropriate reactions: Turn
on features that are categorised as "schema evolution" when the `WITH SCHEMA
EVOLUTION` Syntax exists.
### Why are the changes needed?
This intuitive SQL Syntax allows the user to specify Automatic Schema
Evolution for a specific `INSERT` operation.
Some users would like Schema Evolution for DML commands like `MERGE`,
`INSERT`,... where the schema between the table and query relations can
mismatch.
### Does this PR introduce _any_ user-facing change?
Yes, Introducing the SQL Syntax `WITH SCHEMA EVOLUTION` to SQL `INSERT`.
### How was this patch tested?
Added UTs.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #53732 from longvu-db/insert-schema-evolution.
Lead-authored-by: Thang Long VU <[email protected]>
Co-authored-by: Thang Long Vu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 6 ++
.../spark/sql/catalyst/parser/SqlBaseParser.g4 | 6 +-
.../spark/sql/catalyst/analysis/Analyzer.scala | 39 +++++++---
.../sql/catalyst/analysis/CheckAnalysis.scala | 2 +-
.../spark/sql/catalyst/parser/AstBuilder.scala | 29 ++++++--
.../sql/catalyst/plans/logical/statements.scala | 4 +-
.../spark/sql/errors/QueryCompilationErrors.scala | 6 ++
.../spark/sql/catalyst/parser/DDLParserSuite.scala | 84 ++++++++++++++++++++++
.../execution/datasources/DataSourceStrategy.scala | 11 +--
.../datasources/FallBackFileSourceV2.scala | 2 +-
.../spark/sql/execution/datasources/rules.scala | 11 ++-
.../sql-tests/analyzer-results/explain-aqe.sql.out | 2 +-
.../sql-tests/analyzer-results/explain.sql.out | 2 +-
.../sql-tests/results/explain-aqe.sql.out | 2 +-
.../resources/sql-tests/results/explain.sql.out | 2 +-
.../execution/command/PlanResolutionSuite.scala | 77 +++++++++++++++++++-
.../org/apache/spark/sql/sources/InsertSuite.scala | 37 ++++++++++
.../org/apache/spark/sql/hive/HiveStrategies.scala | 24 +++++--
.../org/apache/spark/sql/hive/InsertSuite.scala | 24 +++++++
19 files changed, 330 insertions(+), 40 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index 396f1933af17..600dae8ac229 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -7137,6 +7137,12 @@
},
"sqlState" : "42809"
},
+ "UNSUPPORTED_INSERT_WITH_SCHEMA_EVOLUTION" : {
+ "message" : [
+ "INSERT WITH SCHEMA EVOLUTION ... is unsupported for this table format."
+ ],
+ "sqlState" : "0A000"
+ },
"UNSUPPORTED_JOIN_TYPE" : {
"message" : [
"Unsupported join type '<typ>'. Supported join types include:
<supported>."
diff --git
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 1c9e95dee1a4..66457d8d5a56 100644
---
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -560,9 +560,9 @@ query
;
insertInto
- : INSERT OVERWRITE TABLE? identifierReference optionsClause?
(partitionSpec (IF errorCapturingNot EXISTS)?)? ((BY NAME) | identifierList)?
#insertOverwriteTable
- | INSERT INTO TABLE? identifierReference optionsClause? partitionSpec? (IF
errorCapturingNot EXISTS)? ((BY NAME) | identifierList)? #insertIntoTable
- | INSERT INTO TABLE? identifierReference optionsClause? (BY NAME)? REPLACE
whereClause #insertIntoReplaceWhere
+ : INSERT (WITH SCHEMA EVOLUTION)? OVERWRITE TABLE? identifierReference
optionsClause? (partitionSpec (IF errorCapturingNot EXISTS)?)? ((BY NAME) |
identifierList)? #insertOverwriteTable
+ | INSERT (WITH SCHEMA EVOLUTION)? INTO TABLE? identifierReference
optionsClause? partitionSpec? (IF errorCapturingNot EXISTS)? ((BY NAME) |
identifierList)? #insertIntoTable
+ | INSERT (WITH SCHEMA EVOLUTION)? INTO TABLE? identifierReference
optionsClause? (BY NAME)? REPLACE whereClause
#insertIntoReplaceWhere
| INSERT OVERWRITE LOCAL? DIRECTORY path=stringLit rowFormat?
createFileFormat? #insertOverwriteHiveDir
| INSERT OVERWRITE LOCAL? DIRECTORY (path=stringLit)? tableProvider
(OPTIONS options=propertyList)? #insertOverwriteDir
;
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 01f89026b08a..c12b9c62dab6 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1097,7 +1097,7 @@ class Analyzer(
def apply(plan: LogicalPlan)
: LogicalPlan = plan.resolveOperatorsUpWithPruning(AlwaysProcess.fn,
ruleId) {
- case i @ InsertIntoStatement(table, _, _, _, _, _, _) =>
+ case i @ InsertIntoStatement(table, _, _, _, _, _, _, _) =>
val relation = table match {
case u: UnresolvedRelation if !u.isStreaming =>
resolveRelation(u).getOrElse(u)
@@ -1232,7 +1232,7 @@ class Analyzer(
object ResolveInsertInto extends ResolveInsertionBase {
override def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveOperatorsWithPruning(
AlwaysProcess.fn, ruleId) {
- case i @ InsertIntoStatement(r: DataSourceV2Relation, _, _, _, _, _, _)
+ case i @ InsertIntoStatement(r: DataSourceV2Relation, _, _, _, _, _, _,
_)
if i.query.resolved =>
// ifPartitionNotExists is append with validation, but validation is
not supported
if (i.ifPartitionNotExists) {
@@ -1250,27 +1250,50 @@ class Analyzer(
val partCols = partitionColumnNames(r.table)
validatePartitionSpec(partCols, i.partitionSpec)
+ val schemaEvolutionWriteOption: Map[String, String] =
+ if (i.withSchemaEvolution) Map("mergeSchema" -> "true") else
Map.empty
+
val staticPartitions =
i.partitionSpec.filter(_._2.isDefined).transform((_, v) => v.get)
val query = addStaticPartitionColumns(r,
projectByName.getOrElse(i.query), staticPartitions,
isByName)
if (!i.overwrite) {
if (isByName) {
- AppendData.byName(r, query)
+ AppendData.byName(
+ r,
+ query,
+ writeOptions = schemaEvolutionWriteOption)
} else {
- AppendData.byPosition(r, query)
+ AppendData.byPosition(
+ r,
+ query,
+ writeOptions = schemaEvolutionWriteOption)
}
} else if (conf.partitionOverwriteMode ==
PartitionOverwriteMode.DYNAMIC) {
if (isByName) {
- OverwritePartitionsDynamic.byName(r, query)
+ OverwritePartitionsDynamic.byName(
+ r,
+ query,
+ writeOptions = schemaEvolutionWriteOption)
} else {
- OverwritePartitionsDynamic.byPosition(r, query)
+ OverwritePartitionsDynamic.byPosition(
+ r,
+ query,
+ writeOptions = schemaEvolutionWriteOption)
}
} else {
if (isByName) {
- OverwriteByExpression.byName(r, query, staticDeleteExpression(r,
staticPartitions))
+ OverwriteByExpression.byName(
+ table = r,
+ df = query,
+ deleteExpr = staticDeleteExpression(r, staticPartitions),
+ writeOptions = schemaEvolutionWriteOption)
} else {
- OverwriteByExpression.byPosition(r, query,
staticDeleteExpression(r, staticPartitions))
+ OverwriteByExpression.byPosition(
+ table = r,
+ query = query,
+ deleteExpr = staticDeleteExpression(r, staticPartitions),
+ writeOptions = schemaEvolutionWriteOption)
}
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 530124a2ec94..5e251ceb222b 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -254,7 +254,7 @@ trait CheckAnalysis extends LookupCatalog with
QueryErrorsBase with PlanToString
// not found first, instead of errors in the input query of the insert
command, by doing a
// top-down traversal.
plan.foreach {
- case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _, _, _) =>
+ case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _, _, _, _) =>
u.tableNotFound(u.multipartIdentifier)
// TODO (SPARK-27484): handle streaming write commands when we have them.
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index a7a0008a9efc..75a1f579a676 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -875,9 +875,12 @@ class AstBuilder extends DataTypeAstBuilder
/**
* Add an
* {{{
- * INSERT OVERWRITE TABLE tableIdentifier [partitionSpec [IF NOT EXISTS]]?
[identifierList]
- * INSERT INTO [TABLE] tableIdentifier [partitionSpec] ([BY NAME] |
[identifierList])
- * INSERT INTO [TABLE] tableIdentifier REPLACE whereClause
+ * INSERT [WITH SCHEMA EVOLUTION] OVERWRITE
+ * TABLE tableIdentifier [partitionSpec [IF NOT EXISTS]]?
[identifierList]
+ * INSERT [WITH SCHEMA EVOLUTION] INTO
+ * [TABLE] tableIdentifier [partitionSpec] ([BY NAME] | [identifierList])
+ * INSERT [WITH SCHEMA EVOLUTION] INTO
+ * [TABLE] tableIdentifier REPLACE whereClause
* INSERT OVERWRITE [LOCAL] DIRECTORY STRING [rowFormat] [createFileFormat]
* INSERT OVERWRITE [LOCAL] DIRECTORY [STRING] tableProvider [OPTIONS
tablePropertyList]
* }}}
@@ -906,7 +909,8 @@ class AstBuilder extends DataTypeAstBuilder
query = otherPlans.head,
overwrite = false,
ifPartitionNotExists = insertParams.ifPartitionNotExists,
- byName = insertParams.byName)
+ byName = insertParams.byName,
+ withSchemaEvolution = table.EVOLUTION() != null)
})
case table: InsertOverwriteTableContext =>
val insertParams = visitInsertOverwriteTable(table)
@@ -923,7 +927,8 @@ class AstBuilder extends DataTypeAstBuilder
query = otherPlans.head,
overwrite = true,
ifPartitionNotExists = insertParams.ifPartitionNotExists,
- byName = insertParams.byName)
+ byName = insertParams.byName,
+ withSchemaEvolution = table.EVOLUTION() != null)
})
case ctx: InsertIntoReplaceWhereContext =>
val options = Option(ctx.optionsClause())
@@ -932,10 +937,20 @@ class AstBuilder extends DataTypeAstBuilder
Seq(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE),
isStreaming = false)
val deleteExpr = expression(ctx.whereClause().booleanExpression())
val isByName = ctx.NAME() != null
+ val schemaEvolutionWriteOption: Map[String, String] =
+ if (ctx.EVOLUTION() != null) Map("mergeSchema" -> "true") else
Map.empty
if (isByName) {
- OverwriteByExpression.byName(table, otherPlans.head, deleteExpr)
+ OverwriteByExpression.byName(
+ table,
+ df = otherPlans.head,
+ deleteExpr,
+ writeOptions = schemaEvolutionWriteOption)
} else {
- OverwriteByExpression.byPosition(table, otherPlans.head,
deleteExpr)
+ OverwriteByExpression.byPosition(
+ table,
+ query = otherPlans.head,
+ deleteExpr,
+ writeOptions = schemaEvolutionWriteOption)
}
})
case dir: InsertOverwriteDirContext =>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
index a9e0650010d4..0930cf17a1db 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
@@ -173,6 +173,7 @@ case class QualifiedColType(
* Only valid for static partitions.
* @param byName If true, reorder the data columns to match the
column names of the
* target table.
+ * @param withSchemaEvolution If true, enables automatic schema evolution for
the operation.
*/
case class InsertIntoStatement(
table: LogicalPlan,
@@ -181,7 +182,8 @@ case class InsertIntoStatement(
query: LogicalPlan,
overwrite: Boolean,
ifPartitionNotExists: Boolean,
- byName: Boolean = false) extends UnaryParsedStatement {
+ byName: Boolean = false,
+ withSchemaEvolution: Boolean = false) extends UnaryParsedStatement {
require(overwrite || !ifPartitionNotExists,
"IF NOT EXISTS is only valid in INSERT OVERWRITE")
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 1241c213c911..901e857d7c47 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -576,6 +576,12 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
origin = t.origin)
}
+ def unsupportedInsertWithSchemaEvolution(): Throwable = {
+ new AnalysisException(
+ errorClass = "UNSUPPORTED_INSERT_WITH_SCHEMA_EVOLUTION",
+ messageParameters = Map.empty)
+ }
+
def writeIntoViewNotAllowedError(identifier: TableIdentifier, t:
TreeNode[_]): Throwable = {
new AnalysisException(
errorClass = "VIEW_WRITE_NOT_ALLOWED",
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index a0267d08dedc..f07e7edff72d 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -1790,6 +1790,90 @@ class DDLParserSuite extends AnalysisTest {
Literal(5))))
}
+ for {
+ isByName <- Seq(true, false)
+ userSpecifiedCols <- if (!isByName) {
+ Seq(Seq("a", "b"), Seq.empty)
+ } else {
+ Seq(Seq.empty)
+ }
+ } {
+ val byNameClause = if (isByName) "BY NAME " else ""
+ val sourceQuery = "SELECT * FROM source"
+ val userSpecifiedColsClause =
+ if (userSpecifiedCols.isEmpty) "" else userSpecifiedCols.mkString("(",
", ", ")")
+ val testMsg = s"isByName=$isByName,
userSpecifiedColsClause=$userSpecifiedColsClause"
+
+ test(s"INSERT INTO with WITH SCHEMA EVOLUTION - $testMsg") {
+ val table = "testcat.ns1.ns2.tbl"
+ val insertSQLStmt = s"INSERT WITH SCHEMA EVOLUTION INTO $table " +
+ s"${userSpecifiedColsClause}${byNameClause}${sourceQuery}"
+
+ parseCompare(
+ sql = insertSQLStmt,
+ expected = InsertIntoStatement(
+ table = UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl")),
+ partitionSpec = Map.empty,
+ userSpecifiedCols = userSpecifiedCols,
+ query = Project(Seq(UnresolvedStar(None)),
UnresolvedRelation(Seq("source"))),
+ overwrite = false,
+ ifPartitionNotExists = false,
+ byName = isByName,
+ withSchemaEvolution = true)
+ )
+ }
+
+ test(s"INSERT OVERWRITE (static) with WITH SCHEMA EVOLUTION - $testMsg") {
+ withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key ->
+ SQLConf.PartitionOverwriteMode.STATIC.toString) {
+ val table = "testcat.ns1.ns2.tbl"
+ val insertSQLStmt = s"INSERT WITH SCHEMA EVOLUTION OVERWRITE $table " +
+ s"${userSpecifiedColsClause}${byNameClause}${sourceQuery}"
+
+ parseCompare(
+ sql = insertSQLStmt,
+ expected = InsertIntoStatement(
+ table = UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl")),
+ partitionSpec = Map.empty,
+ userSpecifiedCols = userSpecifiedCols,
+ query = Project(Seq(UnresolvedStar(None)),
UnresolvedRelation(Seq("source"))),
+ overwrite = true,
+ ifPartitionNotExists = false,
+ byName = isByName,
+ withSchemaEvolution = true)
+ )
+ }
+ }
+ }
+
+ for (isByName <- Seq(true, false)) {
+ val byNameClause = if (isByName) "BY NAME " else ""
+ val sourceQuery = "SELECT * FROM source"
+ val testMsg = s"isByName=$isByName"
+
+ test(s"INSERT OVERWRITE (dynamic) with WITH SCHEMA EVOLUTION - $testMsg") {
+ withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key ->
+ SQLConf.PartitionOverwriteMode.DYNAMIC.toString) {
+ val table = "testcat.ns1.ns2.tbl"
+ val insertSQLStmt = s"INSERT WITH SCHEMA EVOLUTION OVERWRITE $table " +
+ s"${byNameClause}${sourceQuery}"
+
+ parseCompare(
+ sql = insertSQLStmt,
+ expected = InsertIntoStatement(
+ table = UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl")),
+ partitionSpec = Map.empty,
+ userSpecifiedCols = Seq.empty,
+ query = Project(Seq(UnresolvedStar(None)),
UnresolvedRelation(Seq("source"))),
+ overwrite = true,
+ ifPartitionNotExists = false,
+ byName = isByName,
+ withSchemaEvolution = true)
+ )
+ }
+ }
+ }
+
test("delete from table: delete all") {
parseCompare("DELETE FROM testcat.ns1.ns2.tbl",
DeleteFromTable(
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 48b91a064e3f..93ad3ca3856e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -162,7 +162,8 @@ object DataSourceAnalysis extends Rule[LogicalPlan] {
CreateDataSourceTableAsSelectCommand(tableDesc, mode, query,
query.output.map(_.name))
case InsertIntoStatement(l @ LogicalRelationWithTable(_:
InsertableRelation, _),
- parts, _, query, overwrite, false, _) if parts.isEmpty =>
+ parts, _, query, overwrite, false, _, _)
+ if parts.isEmpty =>
InsertIntoDataSourceCommand(l, query, overwrite)
case InsertIntoDir(_, storage, provider, query, overwrite)
@@ -173,8 +174,8 @@ object DataSourceAnalysis extends Rule[LogicalPlan] {
InsertIntoDataSourceDirCommand(storage, provider.get, query, overwrite)
- case i @ InsertIntoStatement(
- l @ LogicalRelationWithTable(t: HadoopFsRelation, table), parts, _,
query, overwrite, _, _)
+ case i @ InsertIntoStatement(l @ LogicalRelationWithTable(t:
HadoopFsRelation, table),
+ parts, _, query, overwrite, _, _, _)
if query.resolved =>
// If the InsertIntoTable command is for a partitioned HadoopFsRelation
and
// the user has specified static partitions, we add a Project operator
on top of the query
@@ -307,11 +308,11 @@ class FindDataSourceTable(sparkSession: SparkSession)
extends Rule[LogicalPlan]
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options,
false),
- _, _, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) =>
+ _, _, _, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) =>
i.copy(table = readDataSourceTable(tableMeta, options))
case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _,
false),
- _, _, _, _, _, _) =>
+ _, _, _, _, _, _, _) =>
i.copy(table = DDLUtils.readHiveTable(tableMeta))
case append @ AppendData(
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala
index 60c459ecf540..e03d6e6772fa 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala
@@ -35,7 +35,7 @@ import
org.apache.spark.sql.execution.datasources.v2.{ExtractV2Table, FileTable}
class FallBackFileSourceV2(sparkSession: SparkSession) extends
Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case i @ InsertIntoStatement(
- d @ ExtractV2Table(table: FileTable), _, _, _, _, _, _) =>
+ d @ ExtractV2Table(table: FileTable), _, _, _, _, _, _, _) =>
val v1FileFormat =
table.fallbackFileFormat.getDeclaredConstructor().newInstance()
val relation = HadoopFsRelation(
table.fileIndex,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index b8e0627dfc43..f097e1aa6379 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -462,6 +462,10 @@ object PreprocessTableInsertion extends
ResolveInsertionBase {
partColNames: StructType,
catalogTable: Option[CatalogTable]): InsertIntoStatement = {
+ if (insert.withSchemaEvolution) {
+ throw QueryCompilationErrors.unsupportedInsertWithSchemaEvolution()
+ }
+
val normalizedPartSpec = normalizePartitionSpec(
insert.partitionSpec, partColNames, tblName, conf.resolver)
@@ -526,7 +530,8 @@ object PreprocessTableInsertion extends
ResolveInsertionBase {
}
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
- case i @ InsertIntoStatement(table, _, _, query, _, _, _) if
table.resolved && query.resolved =>
+ case i @ InsertIntoStatement(table, _, _, query, _, _, _, _)
+ if table.resolved && query.resolved =>
table match {
case relation: HiveTableRelation =>
val metadata = relation.tableMeta
@@ -606,7 +611,7 @@ object PreWriteCheck extends (LogicalPlan => Unit) {
def apply(plan: LogicalPlan): Unit = {
plan.foreach {
case InsertIntoStatement(LogicalRelationWithTable(relation, _),
partition,
- _, query, _, _, _) =>
+ _, query, _, _, _, _) =>
// Get all input data source relations of the query.
val srcRelations = query.collect {
case l: LogicalRelation => l.relation
@@ -635,7 +640,7 @@ object PreWriteCheck extends (LogicalPlan => Unit) {
messageParameters = Map("relationId" ->
toSQLId(relation.toString)))
}
- case InsertIntoStatement(t, _, _, _, _, _, _)
+ case InsertIntoStatement(t, _, _, _, _, _, _, _)
if !t.isInstanceOf[LeafNode] ||
t.isInstanceOf[Range] ||
t.isInstanceOf[OneRowRelation] ||
diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/explain-aqe.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/explain-aqe.sql.out
index 89ba8971a60d..4b9bb859cd56 100644
--- a/sql/core/src/test/resources/sql-tests/analyzer-results/explain-aqe.sql.out
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/explain-aqe.sql.out
@@ -208,7 +208,7 @@ ExplainCommand 'Aggregate ['key], ['key,
unresolvedalias('MIN('val))], Formatted
-- !query
EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * FROM explain_temp4
-- !query analysis
-ExplainCommand 'InsertIntoStatement 'UnresolvedRelation [explain_temp5],
[__required_write_privileges__=INSERT], false, false, false, false, ExtendedMode
+ExplainCommand 'InsertIntoStatement 'UnresolvedRelation [explain_temp5],
[__required_write_privileges__=INSERT], false, false, false, false, false,
ExtendedMode
-- !query
diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/explain.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/explain.sql.out
index 89ba8971a60d..4b9bb859cd56 100644
--- a/sql/core/src/test/resources/sql-tests/analyzer-results/explain.sql.out
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/explain.sql.out
@@ -208,7 +208,7 @@ ExplainCommand 'Aggregate ['key], ['key,
unresolvedalias('MIN('val))], Formatted
-- !query
EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * FROM explain_temp4
-- !query analysis
-ExplainCommand 'InsertIntoStatement 'UnresolvedRelation [explain_temp5],
[__required_write_privileges__=INSERT], false, false, false, false, ExtendedMode
+ExplainCommand 'InsertIntoStatement 'UnresolvedRelation [explain_temp5],
[__required_write_privileges__=INSERT], false, false, false, false, false,
ExtendedMode
-- !query
diff --git a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
index f2df635a5a4f..be22c74f43b0 100644
--- a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
@@ -1183,7 +1183,7 @@ EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT *
FROM explain_temp4
struct<plan:string>
-- !query output
== Parsed Logical Plan ==
-'InsertIntoStatement 'UnresolvedRelation [explain_temp5],
[__required_write_privileges__=INSERT], false, false, false, false
+'InsertIntoStatement 'UnresolvedRelation [explain_temp5],
[__required_write_privileges__=INSERT], false, false, false, false, false
+- 'Project [*]
+- 'UnresolvedRelation [explain_temp4], [], false
diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out
b/sql/core/src/test/resources/sql-tests/results/explain.sql.out
index 221a323b01bb..e6db39f7913c 100644
--- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out
@@ -1075,7 +1075,7 @@ EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT *
FROM explain_temp4
struct<plan:string>
-- !query output
== Parsed Logical Plan ==
-'InsertIntoStatement 'UnresolvedRelation [explain_temp5],
[__required_write_privileges__=INSERT], false, false, false, false
+'InsertIntoStatement 'UnresolvedRelation [explain_temp5],
[__required_write_privileges__=INSERT], false, false, false, false, false
+- 'Project [*]
+- 'UnresolvedRelation [explain_temp4], [], false
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 8e5ee1644f9c..65028eb1777e 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -1249,7 +1249,7 @@ class PlanResolutionSuite extends SharedSparkSession with
AnalysisTest {
case InsertIntoStatement(
_, _, _,
UnresolvedInlineTable(_,
Seq(Seq(UnresolvedAttribute(Seq("DEFAULT"))))),
- _, _, _) =>
+ _, _, _, _) =>
case _ => fail("Expect UpdateTable, but got:\n" + parsed1.treeString)
}
@@ -1257,7 +1257,7 @@ class PlanResolutionSuite extends SharedSparkSession with
AnalysisTest {
case InsertIntoStatement(
_, _, _,
Project(Seq(UnresolvedAttribute(Seq("DEFAULT"))), _),
- _, _, _) =>
+ _, _, _, _) =>
case _ => fail("Expect UpdateTable, but got:\n" + parsed1.treeString)
}
@@ -1325,6 +1325,79 @@ class PlanResolutionSuite extends SharedSparkSession
with AnalysisTest {
}
}
+ for {
+ withSchemaEvolution <- Seq(true, false)
+ isByName <- Seq(true, false)
+ } {
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION " else ""
+ val byNameClause = if (isByName) "BY NAME " else ""
+ val testMsg = s"withSchemaEvolution=$withSchemaEvolution,
isByName=$isByName"
+
+ test(s"INSERT INTO: mergeSchema write option with WITH SCHEMA EVOLUTION -
$testMsg") {
+ val table = "testcat.tab"
+ val insertSQLStmt = s"INSERT ${schemaEvolutionClause}INTO $table
${byNameClause}"
+
+ val writeOptions = parseAndResolve(s"$insertSQLStmt SELECT * FROM
v2Table") match {
+ case appendData: AppendData =>
+ appendData.writeOptions
+ case other =>
+ fail(s"Expected AppendData, but got:
${other.getClass.getSimpleName}")
+ }
+ assert(writeOptions.get("mergeSchema") ===
+ (if (withSchemaEvolution) Some("true") else None))
+ }
+
+ test(s"INSERT OVERWRITE (static): mergeSchema write option with WITH
SCHEMA EVOLUTION - " +
+ testMsg) {
+ withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key ->
PartitionOverwriteMode.STATIC.toString) {
+ val table = "testcat.tab"
+ val insertSQLStmt = s"INSERT ${schemaEvolutionClause}OVERWRITE $table
${byNameClause}"
+
+ val writeOptions = parseAndResolve(s"$insertSQLStmt SELECT * FROM
v2Table") match {
+ case overwriteByExpression: OverwriteByExpression =>
+ overwriteByExpression.writeOptions
+ case other =>
+ fail(s"Expected OverwriteByExpression, but got:
${other.getClass.getSimpleName}")
+ }
+ assert(writeOptions.get("mergeSchema") ===
+ (if (withSchemaEvolution) Some("true") else None))
+ }
+ }
+
+ test(s"INSERT OVERWRITE (dynamic): mergeSchema write option with WITH
SCHEMA EVOLUTION - " +
+ testMsg) {
+ withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key ->
+ PartitionOverwriteMode.DYNAMIC.toString) {
+ val table = "testcat.tab"
+ val insertSQLStmt = s"INSERT ${schemaEvolutionClause}OVERWRITE $table
${byNameClause}"
+
+ val writeOptions = parseAndResolve(s"$insertSQLStmt SELECT * FROM
v2Table") match {
+ case overwritePartitionsDynamic: OverwritePartitionsDynamic =>
+ overwritePartitionsDynamic.writeOptions
+ case other =>
+ fail(s"Expected OverwritePartitionsDynamic, but got:
${other.getClass.getSimpleName}")
+ }
+ assert(writeOptions.get("mergeSchema") ===
+ (if (withSchemaEvolution) Some("true") else None))
+ }
+ }
+
+ test(s"REPLACE WHERE: mergeSchema write option with WITH SCHEMA EVOLUTION
- $testMsg") {
+ val table = "testcat.tab"
+ val insertSQLStmt =
+ s"INSERT ${schemaEvolutionClause}INTO $table ${byNameClause}REPLACE
WHERE i = 1"
+
+ val writeOptions = parseAndResolve(s"$insertSQLStmt SELECT * FROM
v2Table") match {
+ case overwriteByExpression: OverwriteByExpression =>
+ overwriteByExpression.writeOptions
+ case other =>
+ fail(s"Expected OverwriteByExpression, but got:
${other.getClass.getSimpleName}")
+ }
+ assert(writeOptions.get("mergeSchema") ===
+ (if (withSchemaEvolution) Some("true") else None))
+ }
+ }
+
test("alter table: alter column") {
Seq("v1Table" -> true, "v2Table" -> false, "testcat.tab" -> false).foreach
{
case (tblName, useV1Command) =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 3cd289f7c6d8..d92e79645571 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -960,6 +960,43 @@ class InsertSuite extends DataSourceTest with
SharedSparkSession {
}
}
+ test("SPARK-54971: INSERT WITH SCHEMA EVOLUTION is currently unsupported") {
+ withTable("test_table") {
+ val schema = new StructType()
+ .add("i", LongType, false)
+ .add("s", StringType, false)
+ val newTable = CatalogTable(
+ identifier = TableIdentifier("test_table", None),
+ tableType = CatalogTableType.MANAGED,
+ storage = CatalogStorageFormat(
+ locationUri = None,
+ inputFormat = None,
+ outputFormat = None,
+ serde = None,
+ compressed = false,
+ properties = Map.empty),
+ schema = schema,
+ provider = Some(classOf[SimpleInsertSource].getName))
+
+ spark.sessionState.catalog.createTable(newTable, false)
+
+ sql("INSERT INTO TABLE test_table SELECT 1, 'a'")
+ checkError(
+ exception = intercept[AnalysisException] {
+ sql(s"INSERT WITH SCHEMA EVOLUTION INTO TABLE test_table SELECT 1,
'a'")
+ },
+ condition = "UNSUPPORTED_INSERT_WITH_SCHEMA_EVOLUTION"
+ )
+
+ checkError(
+ exception = intercept[AnalysisException] {
+ sql(s"INSERT WITH SCHEMA EVOLUTION INTO TABLE test_table SELECT 1,
'a', 2")
+ },
+ condition = "UNSUPPORTED_INSERT_WITH_SCHEMA_EVOLUTION"
+ )
+ }
+ }
+
test("Allow user to insert specified columns into insertable view") {
sql("INSERT OVERWRITE TABLE jsonTable SELECT a, DEFAULT FROM jt")
checkAnswer(
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 07d9df59b86d..28cb8d9e2040 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -153,7 +153,7 @@ class DetermineTableStats(session: SparkSession) extends
Rule[LogicalPlan] {
// handles InsertIntoStatement specially as the table in
InsertIntoStatement is not added in its
// children, hence not matched directly by previous HiveTableRelation case.
- case i @ InsertIntoStatement(relation: HiveTableRelation, _, _, _, _, _, _)
+ case i @ InsertIntoStatement(relation: HiveTableRelation, _, _, _, _, _,
_, _)
if DDLUtils.isHiveTable(relation.tableMeta) &&
relation.tableMeta.stats.isEmpty =>
i.copy(table = hiveTableWithStats(relation))
}
@@ -168,7 +168,7 @@ class DetermineTableStats(session: SparkSession) extends
Rule[LogicalPlan] {
object HiveAnalysis extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case InsertIntoStatement(
- r: HiveTableRelation, partSpec, _, query, overwrite,
ifPartitionNotExists, _)
+ r: HiveTableRelation, partSpec, _, query, overwrite,
ifPartitionNotExists, _, _)
if DDLUtils.isHiveTable(r.tableMeta) && query.resolved =>
InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite,
ifPartitionNotExists, query.output.map(_.name))
@@ -236,13 +236,27 @@ case class RelationConversions(
plan resolveOperators {
// Write path
case InsertIntoStatement(
- r: HiveTableRelation, partition, cols, query, overwrite,
ifPartitionNotExists, byName)
+ r: HiveTableRelation,
+ partition,
+ cols,
+ query,
+ overwrite,
+ ifPartitionNotExists,
+ byName,
+ withSchemaEvolution)
if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
((r.isPartitioned &&
conf.getConf(HiveUtils.CONVERT_INSERTING_PARTITIONED_TABLE)) ||
(!r.isPartitioned &&
conf.getConf(HiveUtils.CONVERT_INSERTING_UNPARTITIONED_TABLE)))
&& isConvertible(r) =>
- InsertIntoStatement(metastoreCatalog.convert(r, isWrite = true),
partition, cols,
- query, overwrite, ifPartitionNotExists, byName)
+ InsertIntoStatement(
+ metastoreCatalog.convert(r, isWrite = true),
+ partition,
+ cols,
+ query,
+ overwrite,
+ ifPartitionNotExists,
+ byName,
+ withSchemaEvolution)
// Read path
case relation: HiveTableRelation if
doConvertHiveTableRelationForRead(relation) =>
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
index f9c001a1a077..2e45307d9102 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
@@ -451,6 +451,30 @@ class InsertSuite extends QueryTest with TestHiveSingleton
with BeforeAndAfter
}
}
+ testPartitionedTable("SPARK-54971: INSERT WITH SCHEMA EVOLUTION is currently
unsupported") {
+ tableName =>
+ checkError(
+ exception = intercept[AnalysisException] {
+ sql(s"INSERT WITH SCHEMA EVOLUTION INTO TABLE $tableName SELECT 25,
26, 27, 28")
+ },
+ condition = "UNSUPPORTED_INSERT_WITH_SCHEMA_EVOLUTION"
+ )
+
+ checkError(
+ exception = intercept[AnalysisException] {
+ sql(s"INSERT WITH SCHEMA EVOLUTION INTO TABLE $tableName SELECT 25,
26, 27, 28, 29")
+ },
+ condition = "UNSUPPORTED_INSERT_WITH_SCHEMA_EVOLUTION"
+ )
+
+ checkError(
+ exception = intercept[AnalysisException] {
+ sql(s"INSERT WITH SCHEMA EVOLUTION INTO TABLE $tableName SELECT 25,
26, 27, (28, 29)")
+ },
+ condition = "UNSUPPORTED_INSERT_WITH_SCHEMA_EVOLUTION"
+ )
+ }
+
testPartitionedTable("insertInto() should match columns by position and
ignore column names") {
tableName =>
withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]