This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 69e5ba3b56d9 [SPARK-54971] Add WITH SCHEMA EVOLUTION syntax for SQL 
INSERT
69e5ba3b56d9 is described below

commit 69e5ba3b56d99a7c298eff93b62f85a73ad16ae6
Author: Thang Long VU <[email protected]>
AuthorDate: Tue Jan 13 18:53:27 2026 +0800

    [SPARK-54971] Add WITH SCHEMA EVOLUTION syntax for SQL INSERT
    
    ### What changes were proposed in this pull request?
    
    Similar to the [MERGE WITH SCHEMA EVOLUTION 
PR](https://github.com/apache/spark/pull/45748), **this PR introduces a syntax 
`WITH SCHEMA EVOLUTION` to the SQL `INSERT` command.** Since this syntax is not 
fully implemented for any table formats yet, **users will receive an exception 
if they try to use it.**
    
    When `WITH SCHEMA EVOLUTION` is specified, schema evolution-related 
features must be turned on for this single statement and only in this statement.
    
    **In this PR, Spark is only responsible for recognizing the existence or 
absence of the syntax WITH SCHEMA EVOLUTION**, and the recognition info is 
passed down from the `Analyzer`. When `WITH SCHEMA EVOLUTION` is detected, 
Spark sets the `mergeSchema` write option to `true` in the respective V2 Insert 
Command nodes.
    
    Data sources must respect the syntax and give appropriate reactions: Turn 
on features that are categorised as "schema evolution" when the `WITH SCHEMA 
EVOLUTION` Syntax exists.
    
    ### Why are the changes needed?
    
    This intuitive SQL Syntax allows the user to specify Automatic Schema 
Evolution for a specific `INSERT` operation.
    
    Some users would like Schema Evolution for DML commands like `MERGE`, 
`INSERT`,... where the schema between the table and query relations can 
mismatch.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, Introducing the SQL Syntax `WITH SCHEMA EVOLUTION` to SQL `INSERT`.
    
    ### How was this patch tested?
    
    Added UTs.
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #53732 from longvu-db/insert-schema-evolution.
    
    Lead-authored-by: Thang Long VU <[email protected]>
    Co-authored-by: Thang Long Vu <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |  6 ++
 .../spark/sql/catalyst/parser/SqlBaseParser.g4     |  6 +-
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 39 +++++++---
 .../sql/catalyst/analysis/CheckAnalysis.scala      |  2 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala     | 29 ++++++--
 .../sql/catalyst/plans/logical/statements.scala    |  4 +-
 .../spark/sql/errors/QueryCompilationErrors.scala  |  6 ++
 .../spark/sql/catalyst/parser/DDLParserSuite.scala | 84 ++++++++++++++++++++++
 .../execution/datasources/DataSourceStrategy.scala | 11 +--
 .../datasources/FallBackFileSourceV2.scala         |  2 +-
 .../spark/sql/execution/datasources/rules.scala    | 11 ++-
 .../sql-tests/analyzer-results/explain-aqe.sql.out |  2 +-
 .../sql-tests/analyzer-results/explain.sql.out     |  2 +-
 .../sql-tests/results/explain-aqe.sql.out          |  2 +-
 .../resources/sql-tests/results/explain.sql.out    |  2 +-
 .../execution/command/PlanResolutionSuite.scala    | 77 +++++++++++++++++++-
 .../org/apache/spark/sql/sources/InsertSuite.scala | 37 ++++++++++
 .../org/apache/spark/sql/hive/HiveStrategies.scala | 24 +++++--
 .../org/apache/spark/sql/hive/InsertSuite.scala    | 24 +++++++
 19 files changed, 330 insertions(+), 40 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 396f1933af17..600dae8ac229 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -7137,6 +7137,12 @@
     },
     "sqlState" : "42809"
   },
+  "UNSUPPORTED_INSERT_WITH_SCHEMA_EVOLUTION" : {
+    "message" : [
+      "INSERT WITH SCHEMA EVOLUTION ... is unsupported for this table format."
+    ],
+    "sqlState" : "0A000"
+  },
   "UNSUPPORTED_JOIN_TYPE" : {
     "message" : [
       "Unsupported join type '<typ>'. Supported join types include: 
<supported>."
diff --git 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 1c9e95dee1a4..66457d8d5a56 100644
--- 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++ 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -560,9 +560,9 @@ query
     ;
 
 insertInto
-    : INSERT OVERWRITE TABLE? identifierReference optionsClause? 
(partitionSpec (IF errorCapturingNot EXISTS)?)?  ((BY NAME) | identifierList)? 
#insertOverwriteTable
-    | INSERT INTO TABLE? identifierReference optionsClause? partitionSpec? (IF 
errorCapturingNot EXISTS)? ((BY NAME) | identifierList)?   #insertIntoTable
-    | INSERT INTO TABLE? identifierReference optionsClause? (BY NAME)? REPLACE 
whereClause              #insertIntoReplaceWhere
+    : INSERT (WITH SCHEMA EVOLUTION)? OVERWRITE TABLE? identifierReference 
optionsClause? (partitionSpec (IF errorCapturingNot EXISTS)?)?  ((BY NAME) | 
identifierList)? #insertOverwriteTable
+    | INSERT (WITH SCHEMA EVOLUTION)? INTO TABLE? identifierReference 
optionsClause? partitionSpec? (IF errorCapturingNot EXISTS)? ((BY NAME) | 
identifierList)?   #insertIntoTable
+    | INSERT (WITH SCHEMA EVOLUTION)? INTO TABLE? identifierReference 
optionsClause? (BY NAME)? REPLACE whereClause              
#insertIntoReplaceWhere
     | INSERT OVERWRITE LOCAL? DIRECTORY path=stringLit rowFormat? 
createFileFormat?                     #insertOverwriteHiveDir
     | INSERT OVERWRITE LOCAL? DIRECTORY (path=stringLit)? tableProvider 
(OPTIONS options=propertyList)? #insertOverwriteDir
     ;
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 01f89026b08a..c12b9c62dab6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1097,7 +1097,7 @@ class Analyzer(
 
     def apply(plan: LogicalPlan)
         : LogicalPlan = plan.resolveOperatorsUpWithPruning(AlwaysProcess.fn, 
ruleId) {
-      case i @ InsertIntoStatement(table, _, _, _, _, _, _) =>
+      case i @ InsertIntoStatement(table, _, _, _, _, _, _, _) =>
         val relation = table match {
           case u: UnresolvedRelation if !u.isStreaming =>
             resolveRelation(u).getOrElse(u)
@@ -1232,7 +1232,7 @@ class Analyzer(
   object ResolveInsertInto extends ResolveInsertionBase {
     override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsWithPruning(
       AlwaysProcess.fn, ruleId) {
-      case i @ InsertIntoStatement(r: DataSourceV2Relation, _, _, _, _, _, _)
+      case i @ InsertIntoStatement(r: DataSourceV2Relation, _, _, _, _, _, _, 
_)
           if i.query.resolved =>
         // ifPartitionNotExists is append with validation, but validation is 
not supported
         if (i.ifPartitionNotExists) {
@@ -1250,27 +1250,50 @@ class Analyzer(
         val partCols = partitionColumnNames(r.table)
         validatePartitionSpec(partCols, i.partitionSpec)
 
+        val schemaEvolutionWriteOption: Map[String, String] =
+          if (i.withSchemaEvolution) Map("mergeSchema" -> "true") else 
Map.empty
+
         val staticPartitions = 
i.partitionSpec.filter(_._2.isDefined).transform((_, v) => v.get)
         val query = addStaticPartitionColumns(r, 
projectByName.getOrElse(i.query), staticPartitions,
           isByName)
 
         if (!i.overwrite) {
           if (isByName) {
-            AppendData.byName(r, query)
+            AppendData.byName(
+              r,
+              query,
+              writeOptions = schemaEvolutionWriteOption)
           } else {
-            AppendData.byPosition(r, query)
+            AppendData.byPosition(
+              r,
+              query,
+              writeOptions = schemaEvolutionWriteOption)
           }
         } else if (conf.partitionOverwriteMode == 
PartitionOverwriteMode.DYNAMIC) {
           if (isByName) {
-            OverwritePartitionsDynamic.byName(r, query)
+            OverwritePartitionsDynamic.byName(
+              r,
+              query,
+              writeOptions = schemaEvolutionWriteOption)
           } else {
-            OverwritePartitionsDynamic.byPosition(r, query)
+            OverwritePartitionsDynamic.byPosition(
+              r,
+              query,
+              writeOptions = schemaEvolutionWriteOption)
           }
         } else {
           if (isByName) {
-            OverwriteByExpression.byName(r, query, staticDeleteExpression(r, 
staticPartitions))
+            OverwriteByExpression.byName(
+              table = r,
+              df = query,
+              deleteExpr = staticDeleteExpression(r, staticPartitions),
+              writeOptions = schemaEvolutionWriteOption)
           } else {
-            OverwriteByExpression.byPosition(r, query, 
staticDeleteExpression(r, staticPartitions))
+            OverwriteByExpression.byPosition(
+              table = r,
+              query = query,
+              deleteExpr = staticDeleteExpression(r, staticPartitions),
+              writeOptions = schemaEvolutionWriteOption)
           }
         }
     }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 530124a2ec94..5e251ceb222b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -254,7 +254,7 @@ trait CheckAnalysis extends LookupCatalog with 
QueryErrorsBase with PlanToString
     // not found first, instead of errors in the input query of the insert 
command, by doing a
     // top-down traversal.
     plan.foreach {
-      case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _, _, _) =>
+      case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _, _, _, _) =>
         u.tableNotFound(u.multipartIdentifier)
 
       // TODO (SPARK-27484): handle streaming write commands when we have them.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index a7a0008a9efc..75a1f579a676 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -875,9 +875,12 @@ class AstBuilder extends DataTypeAstBuilder
   /**
    * Add an
    * {{{
-   *   INSERT OVERWRITE TABLE tableIdentifier [partitionSpec [IF NOT EXISTS]]? 
[identifierList]
-   *   INSERT INTO [TABLE] tableIdentifier [partitionSpec] ([BY NAME] | 
[identifierList])
-   *   INSERT INTO [TABLE] tableIdentifier REPLACE whereClause
+   *   INSERT [WITH SCHEMA EVOLUTION] OVERWRITE
+   *     TABLE tableIdentifier [partitionSpec [IF NOT EXISTS]]? 
[identifierList]
+   *   INSERT [WITH SCHEMA EVOLUTION] INTO
+   *     [TABLE] tableIdentifier [partitionSpec] ([BY NAME] | [identifierList])
+   *   INSERT [WITH SCHEMA EVOLUTION] INTO
+   *     [TABLE] tableIdentifier REPLACE whereClause
    *   INSERT OVERWRITE [LOCAL] DIRECTORY STRING [rowFormat] [createFileFormat]
    *   INSERT OVERWRITE [LOCAL] DIRECTORY [STRING] tableProvider [OPTIONS 
tablePropertyList]
    * }}}
@@ -906,7 +909,8 @@ class AstBuilder extends DataTypeAstBuilder
             query = otherPlans.head,
             overwrite = false,
             ifPartitionNotExists = insertParams.ifPartitionNotExists,
-            byName = insertParams.byName)
+            byName = insertParams.byName,
+            withSchemaEvolution = table.EVOLUTION() != null)
         })
       case table: InsertOverwriteTableContext =>
         val insertParams = visitInsertOverwriteTable(table)
@@ -923,7 +927,8 @@ class AstBuilder extends DataTypeAstBuilder
             query = otherPlans.head,
             overwrite = true,
             ifPartitionNotExists = insertParams.ifPartitionNotExists,
-            byName = insertParams.byName)
+            byName = insertParams.byName,
+            withSchemaEvolution = table.EVOLUTION() != null)
         })
       case ctx: InsertIntoReplaceWhereContext =>
         val options = Option(ctx.optionsClause())
@@ -932,10 +937,20 @@ class AstBuilder extends DataTypeAstBuilder
             Seq(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE), 
isStreaming = false)
           val deleteExpr = expression(ctx.whereClause().booleanExpression())
           val isByName = ctx.NAME() != null
+          val schemaEvolutionWriteOption: Map[String, String] =
+            if (ctx.EVOLUTION() != null) Map("mergeSchema" -> "true") else 
Map.empty
           if (isByName) {
-            OverwriteByExpression.byName(table, otherPlans.head, deleteExpr)
+            OverwriteByExpression.byName(
+              table,
+              df = otherPlans.head,
+              deleteExpr,
+              writeOptions = schemaEvolutionWriteOption)
           } else {
-            OverwriteByExpression.byPosition(table, otherPlans.head, 
deleteExpr)
+            OverwriteByExpression.byPosition(
+              table,
+              query = otherPlans.head,
+              deleteExpr,
+              writeOptions = schemaEvolutionWriteOption)
           }
         })
       case dir: InsertOverwriteDirContext =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
index a9e0650010d4..0930cf17a1db 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
@@ -173,6 +173,7 @@ case class QualifiedColType(
  *                             Only valid for static partitions.
  * @param byName               If true, reorder the data columns to match the 
column names of the
  *                             target table.
+ * @param withSchemaEvolution  If true, enables automatic schema evolution for 
the operation.
  */
 case class InsertIntoStatement(
     table: LogicalPlan,
@@ -181,7 +182,8 @@ case class InsertIntoStatement(
     query: LogicalPlan,
     overwrite: Boolean,
     ifPartitionNotExists: Boolean,
-    byName: Boolean = false) extends UnaryParsedStatement {
+    byName: Boolean = false,
+    withSchemaEvolution: Boolean = false) extends UnaryParsedStatement {
 
   require(overwrite || !ifPartitionNotExists,
     "IF NOT EXISTS is only valid in INSERT OVERWRITE")
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 1241c213c911..901e857d7c47 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -576,6 +576,12 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
       origin = t.origin)
   }
 
+  def unsupportedInsertWithSchemaEvolution(): Throwable = {
+    new AnalysisException(
+      errorClass = "UNSUPPORTED_INSERT_WITH_SCHEMA_EVOLUTION",
+      messageParameters = Map.empty)
+  }
+
   def writeIntoViewNotAllowedError(identifier: TableIdentifier, t: 
TreeNode[_]): Throwable = {
     new AnalysisException(
       errorClass = "VIEW_WRITE_NOT_ALLOWED",
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index a0267d08dedc..f07e7edff72d 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -1790,6 +1790,90 @@ class DDLParserSuite extends AnalysisTest {
           Literal(5))))
   }
 
+  for {
+    isByName <- Seq(true, false)
+    userSpecifiedCols <- if (!isByName) {
+      Seq(Seq("a", "b"), Seq.empty)
+    } else {
+      Seq(Seq.empty)
+    }
+  } {
+    val byNameClause = if (isByName) "BY NAME " else ""
+    val sourceQuery = "SELECT * FROM source"
+    val userSpecifiedColsClause =
+      if (userSpecifiedCols.isEmpty) "" else userSpecifiedCols.mkString("(", 
", ", ")")
+    val testMsg = s"isByName=$isByName, 
userSpecifiedColsClause=$userSpecifiedColsClause"
+
+    test(s"INSERT INTO with WITH SCHEMA EVOLUTION - $testMsg") {
+      val table = "testcat.ns1.ns2.tbl"
+      val insertSQLStmt = s"INSERT WITH SCHEMA EVOLUTION INTO $table " +
+        s"${userSpecifiedColsClause}${byNameClause}${sourceQuery}"
+
+      parseCompare(
+        sql = insertSQLStmt,
+        expected = InsertIntoStatement(
+          table = UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl")),
+          partitionSpec = Map.empty,
+          userSpecifiedCols = userSpecifiedCols,
+          query = Project(Seq(UnresolvedStar(None)), 
UnresolvedRelation(Seq("source"))),
+          overwrite = false,
+          ifPartitionNotExists = false,
+          byName = isByName,
+          withSchemaEvolution = true)
+      )
+    }
+
+    test(s"INSERT OVERWRITE (static) with WITH SCHEMA EVOLUTION - $testMsg") {
+      withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key ->
+        SQLConf.PartitionOverwriteMode.STATIC.toString) {
+        val table = "testcat.ns1.ns2.tbl"
+        val insertSQLStmt = s"INSERT WITH SCHEMA EVOLUTION OVERWRITE $table " +
+          s"${userSpecifiedColsClause}${byNameClause}${sourceQuery}"
+
+        parseCompare(
+          sql = insertSQLStmt,
+          expected = InsertIntoStatement(
+            table = UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl")),
+            partitionSpec = Map.empty,
+            userSpecifiedCols = userSpecifiedCols,
+            query = Project(Seq(UnresolvedStar(None)), 
UnresolvedRelation(Seq("source"))),
+            overwrite = true,
+            ifPartitionNotExists = false,
+            byName = isByName,
+            withSchemaEvolution = true)
+        )
+      }
+    }
+  }
+
+  for (isByName <- Seq(true, false)) {
+    val byNameClause = if (isByName) "BY NAME " else ""
+    val sourceQuery = "SELECT * FROM source"
+    val testMsg = s"isByName=$isByName"
+
+    test(s"INSERT OVERWRITE (dynamic) with WITH SCHEMA EVOLUTION - $testMsg") {
+      withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key ->
+        SQLConf.PartitionOverwriteMode.DYNAMIC.toString) {
+        val table = "testcat.ns1.ns2.tbl"
+        val insertSQLStmt = s"INSERT WITH SCHEMA EVOLUTION OVERWRITE $table " +
+          s"${byNameClause}${sourceQuery}"
+
+        parseCompare(
+          sql = insertSQLStmt,
+          expected = InsertIntoStatement(
+            table = UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl")),
+            partitionSpec = Map.empty,
+            userSpecifiedCols = Seq.empty,
+            query = Project(Seq(UnresolvedStar(None)), 
UnresolvedRelation(Seq("source"))),
+            overwrite = true,
+            ifPartitionNotExists = false,
+            byName = isByName,
+            withSchemaEvolution = true)
+        )
+      }
+    }
+  }
+
   test("delete from table: delete all") {
     parseCompare("DELETE FROM testcat.ns1.ns2.tbl",
       DeleteFromTable(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 48b91a064e3f..93ad3ca3856e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -162,7 +162,8 @@ object DataSourceAnalysis extends Rule[LogicalPlan] {
       CreateDataSourceTableAsSelectCommand(tableDesc, mode, query, 
query.output.map(_.name))
 
     case InsertIntoStatement(l @ LogicalRelationWithTable(_: 
InsertableRelation, _),
-        parts, _, query, overwrite, false, _) if parts.isEmpty =>
+        parts, _, query, overwrite, false, _, _)
+        if parts.isEmpty =>
       InsertIntoDataSourceCommand(l, query, overwrite)
 
     case InsertIntoDir(_, storage, provider, query, overwrite)
@@ -173,8 +174,8 @@ object DataSourceAnalysis extends Rule[LogicalPlan] {
 
       InsertIntoDataSourceDirCommand(storage, provider.get, query, overwrite)
 
-    case i @ InsertIntoStatement(
-        l @ LogicalRelationWithTable(t: HadoopFsRelation, table), parts, _, 
query, overwrite, _, _)
+    case i @ InsertIntoStatement(l @ LogicalRelationWithTable(t: 
HadoopFsRelation, table),
+        parts, _, query, overwrite, _, _, _)
         if query.resolved =>
       // If the InsertIntoTable command is for a partitioned HadoopFsRelation 
and
       // the user has specified static partitions, we add a Project operator 
on top of the query
@@ -307,11 +308,11 @@ class FindDataSourceTable(sparkSession: SparkSession) 
extends Rule[LogicalPlan]
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options, 
false),
-        _, _, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) =>
+        _, _, _, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) =>
       i.copy(table = readDataSourceTable(tableMeta, options))
 
     case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, 
false),
-        _, _, _, _, _, _) =>
+        _, _, _, _, _, _, _) =>
       i.copy(table = DDLUtils.readHiveTable(tableMeta))
 
     case append @ AppendData(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala
index 60c459ecf540..e03d6e6772fa 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala
@@ -35,7 +35,7 @@ import 
org.apache.spark.sql.execution.datasources.v2.{ExtractV2Table, FileTable}
 class FallBackFileSourceV2(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case i @ InsertIntoStatement(
-        d @ ExtractV2Table(table: FileTable), _, _, _, _, _, _) =>
+        d @ ExtractV2Table(table: FileTable), _, _, _, _, _, _, _) =>
       val v1FileFormat = 
table.fallbackFileFormat.getDeclaredConstructor().newInstance()
       val relation = HadoopFsRelation(
         table.fileIndex,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index b8e0627dfc43..f097e1aa6379 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -462,6 +462,10 @@ object PreprocessTableInsertion extends 
ResolveInsertionBase {
       partColNames: StructType,
       catalogTable: Option[CatalogTable]): InsertIntoStatement = {
 
+    if (insert.withSchemaEvolution) {
+      throw QueryCompilationErrors.unsupportedInsertWithSchemaEvolution()
+    }
+
     val normalizedPartSpec = normalizePartitionSpec(
       insert.partitionSpec, partColNames, tblName, conf.resolver)
 
@@ -526,7 +530,8 @@ object PreprocessTableInsertion extends 
ResolveInsertionBase {
   }
 
   def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    case i @ InsertIntoStatement(table, _, _, query, _, _, _) if 
table.resolved && query.resolved =>
+    case i @ InsertIntoStatement(table, _, _, query, _, _, _, _)
+      if table.resolved && query.resolved =>
       table match {
         case relation: HiveTableRelation =>
           val metadata = relation.tableMeta
@@ -606,7 +611,7 @@ object PreWriteCheck extends (LogicalPlan => Unit) {
   def apply(plan: LogicalPlan): Unit = {
     plan.foreach {
       case InsertIntoStatement(LogicalRelationWithTable(relation, _), 
partition,
-          _, query, _, _, _) =>
+          _, query, _, _, _, _) =>
         // Get all input data source relations of the query.
         val srcRelations = query.collect {
           case l: LogicalRelation => l.relation
@@ -635,7 +640,7 @@ object PreWriteCheck extends (LogicalPlan => Unit) {
               messageParameters = Map("relationId" -> 
toSQLId(relation.toString)))
         }
 
-      case InsertIntoStatement(t, _, _, _, _, _, _)
+      case InsertIntoStatement(t, _, _, _, _, _, _, _)
         if !t.isInstanceOf[LeafNode] ||
           t.isInstanceOf[Range] ||
           t.isInstanceOf[OneRowRelation] ||
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/explain-aqe.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/explain-aqe.sql.out
index 89ba8971a60d..4b9bb859cd56 100644
--- a/sql/core/src/test/resources/sql-tests/analyzer-results/explain-aqe.sql.out
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/explain-aqe.sql.out
@@ -208,7 +208,7 @@ ExplainCommand 'Aggregate ['key], ['key, 
unresolvedalias('MIN('val))], Formatted
 -- !query
 EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * FROM explain_temp4
 -- !query analysis
-ExplainCommand 'InsertIntoStatement 'UnresolvedRelation [explain_temp5], 
[__required_write_privileges__=INSERT], false, false, false, false, ExtendedMode
+ExplainCommand 'InsertIntoStatement 'UnresolvedRelation [explain_temp5], 
[__required_write_privileges__=INSERT], false, false, false, false, false, 
ExtendedMode
 
 
 -- !query
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/explain.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/explain.sql.out
index 89ba8971a60d..4b9bb859cd56 100644
--- a/sql/core/src/test/resources/sql-tests/analyzer-results/explain.sql.out
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/explain.sql.out
@@ -208,7 +208,7 @@ ExplainCommand 'Aggregate ['key], ['key, 
unresolvedalias('MIN('val))], Formatted
 -- !query
 EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * FROM explain_temp4
 -- !query analysis
-ExplainCommand 'InsertIntoStatement 'UnresolvedRelation [explain_temp5], 
[__required_write_privileges__=INSERT], false, false, false, false, ExtendedMode
+ExplainCommand 'InsertIntoStatement 'UnresolvedRelation [explain_temp5], 
[__required_write_privileges__=INSERT], false, false, false, false, false, 
ExtendedMode
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out 
b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
index f2df635a5a4f..be22c74f43b0 100644
--- a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
@@ -1183,7 +1183,7 @@ EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * 
FROM explain_temp4
 struct<plan:string>
 -- !query output
 == Parsed Logical Plan ==
-'InsertIntoStatement 'UnresolvedRelation [explain_temp5], 
[__required_write_privileges__=INSERT], false, false, false, false
+'InsertIntoStatement 'UnresolvedRelation [explain_temp5], 
[__required_write_privileges__=INSERT], false, false, false, false, false
 +- 'Project [*]
    +- 'UnresolvedRelation [explain_temp4], [], false
 
diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out 
b/sql/core/src/test/resources/sql-tests/results/explain.sql.out
index 221a323b01bb..e6db39f7913c 100644
--- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out
@@ -1075,7 +1075,7 @@ EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * 
FROM explain_temp4
 struct<plan:string>
 -- !query output
 == Parsed Logical Plan ==
-'InsertIntoStatement 'UnresolvedRelation [explain_temp5], 
[__required_write_privileges__=INSERT], false, false, false, false
+'InsertIntoStatement 'UnresolvedRelation [explain_temp5], 
[__required_write_privileges__=INSERT], false, false, false, false, false
 +- 'Project [*]
    +- 'UnresolvedRelation [explain_temp4], [], false
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 8e5ee1644f9c..65028eb1777e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -1249,7 +1249,7 @@ class PlanResolutionSuite extends SharedSparkSession with 
AnalysisTest {
       case InsertIntoStatement(
         _, _, _,
         UnresolvedInlineTable(_, 
Seq(Seq(UnresolvedAttribute(Seq("DEFAULT"))))),
-        _, _, _) =>
+        _, _, _, _) =>
 
       case _ => fail("Expect UpdateTable, but got:\n" + parsed1.treeString)
     }
@@ -1257,7 +1257,7 @@ class PlanResolutionSuite extends SharedSparkSession with 
AnalysisTest {
       case InsertIntoStatement(
         _, _, _,
         Project(Seq(UnresolvedAttribute(Seq("DEFAULT"))), _),
-        _, _, _) =>
+        _, _, _, _) =>
 
       case _ => fail("Expect UpdateTable, but got:\n" + parsed1.treeString)
     }
@@ -1325,6 +1325,79 @@ class PlanResolutionSuite extends SharedSparkSession 
with AnalysisTest {
     }
   }
 
+  for {
+    withSchemaEvolution <- Seq(true, false)
+    isByName <- Seq(true, false)
+  } {
+    val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION " else ""
+    val byNameClause = if (isByName) "BY NAME " else ""
+    val testMsg = s"withSchemaEvolution=$withSchemaEvolution, 
isByName=$isByName"
+
+    test(s"INSERT INTO: mergeSchema write option with WITH SCHEMA EVOLUTION - 
$testMsg") {
+      val table = "testcat.tab"
+      val insertSQLStmt = s"INSERT ${schemaEvolutionClause}INTO $table 
${byNameClause}"
+
+      val writeOptions = parseAndResolve(s"$insertSQLStmt SELECT * FROM 
v2Table") match {
+        case appendData: AppendData =>
+          appendData.writeOptions
+        case other =>
+          fail(s"Expected AppendData, but got: 
${other.getClass.getSimpleName}")
+      }
+      assert(writeOptions.get("mergeSchema") ===
+        (if (withSchemaEvolution) Some("true") else None))
+    }
+
+    test(s"INSERT OVERWRITE (static): mergeSchema write option with WITH 
SCHEMA EVOLUTION - " +
+        testMsg) {
+      withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> 
PartitionOverwriteMode.STATIC.toString) {
+        val table = "testcat.tab"
+        val insertSQLStmt = s"INSERT ${schemaEvolutionClause}OVERWRITE $table 
${byNameClause}"
+
+        val writeOptions = parseAndResolve(s"$insertSQLStmt SELECT * FROM 
v2Table") match {
+          case overwriteByExpression: OverwriteByExpression =>
+            overwriteByExpression.writeOptions
+          case other =>
+            fail(s"Expected OverwriteByExpression, but got: 
${other.getClass.getSimpleName}")
+        }
+        assert(writeOptions.get("mergeSchema") ===
+          (if (withSchemaEvolution) Some("true") else None))
+      }
+    }
+
+    test(s"INSERT OVERWRITE (dynamic): mergeSchema write option with WITH 
SCHEMA EVOLUTION - " +
+        testMsg) {
+      withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key ->
+        PartitionOverwriteMode.DYNAMIC.toString) {
+        val table = "testcat.tab"
+        val insertSQLStmt = s"INSERT ${schemaEvolutionClause}OVERWRITE $table 
${byNameClause}"
+
+        val writeOptions = parseAndResolve(s"$insertSQLStmt SELECT * FROM 
v2Table") match {
+          case overwritePartitionsDynamic: OverwritePartitionsDynamic =>
+            overwritePartitionsDynamic.writeOptions
+          case other =>
+            fail(s"Expected OverwritePartitionsDynamic, but got: 
${other.getClass.getSimpleName}")
+        }
+        assert(writeOptions.get("mergeSchema") ===
+          (if (withSchemaEvolution) Some("true") else None))
+      }
+    }
+
+    test(s"REPLACE WHERE: mergeSchema write option with WITH SCHEMA EVOLUTION 
- $testMsg") {
+      val table = "testcat.tab"
+      val insertSQLStmt =
+        s"INSERT ${schemaEvolutionClause}INTO $table ${byNameClause}REPLACE 
WHERE i = 1"
+
+      val writeOptions = parseAndResolve(s"$insertSQLStmt SELECT * FROM 
v2Table") match {
+        case overwriteByExpression: OverwriteByExpression =>
+          overwriteByExpression.writeOptions
+        case other =>
+          fail(s"Expected OverwriteByExpression, but got: 
${other.getClass.getSimpleName}")
+      }
+      assert(writeOptions.get("mergeSchema") ===
+        (if (withSchemaEvolution) Some("true") else None))
+    }
+  }
+
   test("alter table: alter column") {
     Seq("v1Table" -> true, "v2Table" -> false, "testcat.tab" -> false).foreach 
{
       case (tblName, useV1Command) =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 3cd289f7c6d8..d92e79645571 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -960,6 +960,43 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
     }
   }
 
+  test("SPARK-54971: INSERT WITH SCHEMA EVOLUTION is currently unsupported") {
+    withTable("test_table") {
+      val schema = new StructType()
+        .add("i", LongType, false)
+        .add("s", StringType, false)
+      val newTable = CatalogTable(
+        identifier = TableIdentifier("test_table", None),
+        tableType = CatalogTableType.MANAGED,
+        storage = CatalogStorageFormat(
+          locationUri = None,
+          inputFormat = None,
+          outputFormat = None,
+          serde = None,
+          compressed = false,
+          properties = Map.empty),
+        schema = schema,
+        provider = Some(classOf[SimpleInsertSource].getName))
+
+      spark.sessionState.catalog.createTable(newTable, false)
+
+      sql("INSERT INTO TABLE test_table SELECT 1, 'a'")
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"INSERT WITH SCHEMA EVOLUTION INTO TABLE test_table SELECT 1, 
'a'")
+        },
+        condition = "UNSUPPORTED_INSERT_WITH_SCHEMA_EVOLUTION"
+      )
+
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"INSERT WITH SCHEMA EVOLUTION INTO TABLE test_table SELECT 1, 
'a', 2")
+        },
+        condition = "UNSUPPORTED_INSERT_WITH_SCHEMA_EVOLUTION"
+      )
+    }
+  }
+
   test("Allow user to insert specified columns into insertable view") {
     sql("INSERT OVERWRITE TABLE jsonTable SELECT a, DEFAULT FROM jt")
     checkAnswer(
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 07d9df59b86d..28cb8d9e2040 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -153,7 +153,7 @@ class DetermineTableStats(session: SparkSession) extends 
Rule[LogicalPlan] {
 
     // handles InsertIntoStatement specially as the table in 
InsertIntoStatement is not added in its
     // children, hence not matched directly by previous HiveTableRelation case.
-    case i @ InsertIntoStatement(relation: HiveTableRelation, _, _, _, _, _, _)
+    case i @ InsertIntoStatement(relation: HiveTableRelation, _, _, _, _, _, 
_, _)
       if DDLUtils.isHiveTable(relation.tableMeta) && 
relation.tableMeta.stats.isEmpty =>
       i.copy(table = hiveTableWithStats(relation))
   }
@@ -168,7 +168,7 @@ class DetermineTableStats(session: SparkSession) extends 
Rule[LogicalPlan] {
 object HiveAnalysis extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case InsertIntoStatement(
-        r: HiveTableRelation, partSpec, _, query, overwrite, 
ifPartitionNotExists, _)
+          r: HiveTableRelation, partSpec, _, query, overwrite, 
ifPartitionNotExists, _, _)
         if DDLUtils.isHiveTable(r.tableMeta) && query.resolved =>
       InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite,
         ifPartitionNotExists, query.output.map(_.name))
@@ -236,13 +236,27 @@ case class RelationConversions(
     plan resolveOperators {
       // Write path
       case InsertIntoStatement(
-          r: HiveTableRelation, partition, cols, query, overwrite, 
ifPartitionNotExists, byName)
+            r: HiveTableRelation,
+            partition,
+            cols,
+            query,
+            overwrite,
+            ifPartitionNotExists,
+            byName,
+            withSchemaEvolution)
           if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
             ((r.isPartitioned && 
conf.getConf(HiveUtils.CONVERT_INSERTING_PARTITIONED_TABLE)) ||
               (!r.isPartitioned && 
conf.getConf(HiveUtils.CONVERT_INSERTING_UNPARTITIONED_TABLE)))
             && isConvertible(r) =>
-        InsertIntoStatement(metastoreCatalog.convert(r, isWrite = true), 
partition, cols,
-          query, overwrite, ifPartitionNotExists, byName)
+        InsertIntoStatement(
+          metastoreCatalog.convert(r, isWrite = true),
+          partition,
+          cols,
+          query,
+          overwrite,
+          ifPartitionNotExists,
+          byName,
+          withSchemaEvolution)
 
       // Read path
       case relation: HiveTableRelation if 
doConvertHiveTableRelationForRead(relation) =>
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
index f9c001a1a077..2e45307d9102 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
@@ -451,6 +451,30 @@ class InsertSuite extends QueryTest with TestHiveSingleton 
with BeforeAndAfter
       }
   }
 
+  testPartitionedTable("SPARK-54971: INSERT WITH SCHEMA EVOLUTION is currently 
unsupported") {
+    tableName =>
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"INSERT WITH SCHEMA EVOLUTION INTO TABLE $tableName SELECT 25, 
26, 27, 28")
+        },
+        condition = "UNSUPPORTED_INSERT_WITH_SCHEMA_EVOLUTION"
+      )
+
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"INSERT WITH SCHEMA EVOLUTION INTO TABLE $tableName SELECT 25, 
26, 27, 28, 29")
+        },
+        condition = "UNSUPPORTED_INSERT_WITH_SCHEMA_EVOLUTION"
+      )
+
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"INSERT WITH SCHEMA EVOLUTION INTO TABLE $tableName SELECT 25, 
26, 27, (28, 29)")
+        },
+        condition = "UNSUPPORTED_INSERT_WITH_SCHEMA_EVOLUTION"
+      )
+  }
+
   testPartitionedTable("insertInto() should match columns by position and 
ignore column names") {
     tableName =>
       withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to