This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.2 by this push:
     new 9e27d319175d [SPARK-56550][SQL][4.2] Support source with fewer 
columns/fields in INSERT INTO WITH SCHEMA EVOLUTION
9e27d319175d is described below

commit 9e27d319175dd6ac09ea0e2b578ab3fe1967fb81
Author: Szehon Ho <[email protected]>
AuthorDate: Wed May 20 21:07:14 2026 +0800

    [SPARK-56550][SQL][4.2] Support source with fewer columns/fields in INSERT 
INTO WITH SCHEMA EVOLUTION
    
    ## Summary
    
    Backport of #55427 to `branch-4.2`.
    
    Adds support for `INSERT INTO ... WITH SCHEMA EVOLUTION` to fill missing 
nested struct fields with null (or column defaults) when the source has fewer 
fields than the target table, mirroring existing `MERGE INTO` behavior gated by 
`spark.sql.mergeNestedTypeCoercion.enabled`.
    
    Key changes:
    - New config: `spark.sql.insertNestedTypeCoercion.enabled` (internal, 
default `false`)
    - Refactor `TableOutputResolver.resolveOutputColumns` to use 
`DefaultValueFillMode` (`NONE`, `FILL`, `RECURSE`)
    - Enable `RECURSE` mode for V2 inserts when schema evolution and the 
coercion flag are both enabled
    - 17 new tests in `InsertIntoSchemaEvolutionTests` (via 
`InsertIntoTests.scala`)
    
    ## Why are the changes needed?
    
    `MERGE INTO` already supports nested type coercion when the source has 
fewer struct fields than the target. `INSERT INTO WITH SCHEMA EVOLUTION` lacked 
this capability, causing errors for legitimate schema-evolution workflows where 
older sources omit newer nested fields.
    
    ## Does this PR introduce _any_ user-facing change?
    
    Yes. When `spark.sql.insertNestedTypeCoercion.enabled` is set to `true` 
(default `false`), `INSERT INTO ... WITH SCHEMA EVOLUTION` fills missing nested 
struct fields with null instead of failing.
    
    ## How was this patch tested?
    
    Cherry-picked from #55427 onto current `branch-4.2` (`bd8872a0cc7`) with 
clean cherry-picks (no conflicts).
    
    Original PR test plan:
    - Added comprehensive positive/negative tests in `InsertIntoTests.scala`
    - All matched tests passed on master
    
    ## Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Cursor (Claude Opus 4)
    
    Closes #56002 from szehon-ho/insert-schema-evolution-missing-fields-4.2.
    
    Authored-by: Szehon Ho <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  10 +-
 .../catalyst/analysis/TableOutputResolver.scala    | 124 +++-
 .../org/apache/spark/sql/internal/SQLConf.scala    |  15 +
 .../spark/sql/execution/datasources/rules.scala    |   2 +-
 .../spark/sql/connector/InsertIntoTests.scala      | 738 +++++++++++++++++++++
 .../configs-without-binding-policy-exceptions      |   1 +
 6 files changed, 853 insertions(+), 37 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 09c6a0984258..faa78e030636 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -29,6 +29,7 @@ import org.apache.spark.{SparkException, 
SparkUnsupportedOperationException}
 import org.apache.spark.internal.config.ConfigBindingPolicy
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst._
+import 
org.apache.spark.sql.catalyst.analysis.TableOutputResolver.DefaultValueFillMode._
 import org.apache.spark.sql.catalyst.analysis.resolver.{
   AnalyzerBridgeState,
   HybridAnalyzer,
@@ -3788,9 +3789,16 @@ class Analyzer(
         validateStoreAssignmentPolicy()
         TableOutputResolver.suitableForByNameCheck(v2Write.isByName,
           expected = v2Write.table.output, queryOutput = v2Write.query.output)
+        // With schema evolution + coercion flag, missing top-level columns 
AND missing nested
+        // struct fields are filled with defaults/null (RECURSE mode). 
Otherwise, only missing
+        // top-level columns are filled via FILL mode; missing nested struct 
fields still cause
+        // schema enforcement errors.
+        val defaultValueFillMode =
+          if (conf.coerceInsertNestedTypes && v2Write.schemaEvolutionEnabled) 
RECURSE
+          else FILL
         val projection = TableOutputResolver.resolveOutputColumns(
           v2Write.table.name, v2Write.table.output, v2Write.query, 
v2Write.isByName, conf,
-          supportColDefaultValue = true)
+          defaultValueFillMode)
         if (projection != v2Write.query) {
           val cleanedTable = v2Write.table match {
             case r: DataSourceV2Relation =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
index 7eacc5ab9b2a..d691c449733f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
@@ -41,9 +41,10 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
 
   /**
    * Modes for filling in default or null values for missing columns.
-   * If FILL, fill missing top-level columns with their default values.
-   * If RECURSE, fill missing top-level columns and also recurse into nested 
struct
-   * fields to fill null.
+   * If FILL, fill missing top-level columns with their default values 
(by-name reorder path).
+   * If RECURSE, fill missing top-level columns (including trailing columns on 
the by-position
+   * path for INSERT with schema evolution when enabled) and recurse into 
nested structs,
+   * arrays, and maps to fill missing struct fields with null or defaults.
    * If NONE, do not fill any missing columns.
    */
   object DefaultValueFillMode extends Enumeration {
@@ -92,19 +93,22 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
       query: LogicalPlan,
       byName: Boolean,
       conf: SQLConf,
-      supportColDefaultValue: Boolean = false): LogicalPlan = {
+      defaultValueFillMode: DefaultValueFillMode.Value = NONE): LogicalPlan = {
 
     if (expected.size < query.output.size) {
       throw QueryCompilationErrors.cannotWriteTooManyColumnsToTableError(
         tableName, expected.map(_.name), query.output)
     }
 
+    // In RECURSE mode, allow fewer source columns than target by filling 
trailing columns
+    // with defaults. In other modes, a column count mismatch in by-position 
resolution is
+    // an error.
+    val fillDefaultValue = defaultValueFillMode == RECURSE
     val errors = new mutable.ArrayBuffer[String]()
     val resolved: Seq[NamedExpression] = if (byName) {
-      // If a top-level column does not have a corresponding value in the 
input query, fill with
-      // the column's default value. We need to pass `fillDefaultValue` as 
FILL here, if the
-      // `supportColDefaultValue` parameter is also true.
-      val defaultValueFillMode = if (supportColDefaultValue) FILL else NONE
+      // By-name resolution: the defaultValueFillMode is passed through to 
control whether
+      // missing top-level columns are filled (FILL/RECURSE) and whether 
missing nested
+      // struct fields are also filled (RECURSE only).
       reorderColumnsByName(
         tableName,
         query.output,
@@ -112,13 +116,15 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
         conf,
         errors += _,
         Nil,
-        defaultValueFillMode)
+        defaultValueFillMode,
+        enforceFullOutput = true)
     } else {
-      if (expected.size > query.output.size) {
+      if (expected.size > query.output.size && !fillDefaultValue) {
         throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError(
           tableName, expected.map(_.name), query.output)
       }
-      resolveColumnsByPosition(tableName, query.output, expected, conf, errors 
+= _)
+      resolveColumnsByPosition(
+        tableName, query.output, expected, conf, errors += _, fillDefaultValue 
= fillDefaultValue)
     }
 
     if (errors.nonEmpty) {
@@ -157,17 +163,17 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
       case (valueType: StructType, colType: StructType) =>
         val resolvedValue = resolveStructType(
           tableName, value, valueType, col, colType,
-          byName = true, conf, addError, colPath, fillChildDefaultValue)
+          byName = true, conf, addError, colPath, fillChildDefaultValue, 
enforceFullOutput = false)
         resolvedValue.getOrElse(value)
       case (valueType: ArrayType, colType: ArrayType) =>
         val resolvedValue = resolveArrayType(
           tableName, value, valueType, col, colType,
-          byName = true, conf, addError, colPath, fillChildDefaultValue)
+          byName = true, conf, addError, colPath, fillChildDefaultValue, 
enforceFullOutput = false)
         resolvedValue.getOrElse(value)
       case (valueType: MapType, colType: MapType) =>
         val resolvedValue = resolveMapType(
           tableName, value, valueType, col, colType,
-          byName = true, conf, addError, colPath, fillChildDefaultValue)
+          byName = true, conf, addError, colPath, fillChildDefaultValue, 
enforceFullOutput = false)
         resolvedValue.getOrElse(value)
       case _ =>
         checkUpdate(tableName, value, col, conf, addError, colPath)
@@ -304,7 +310,8 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
       conf: SQLConf,
       addError: String => Unit,
       colPath: Seq[String] = Nil,
-      defaultValueFillMode: DefaultValueFillMode.Value): Seq[NamedExpression] 
= {
+      defaultValueFillMode: DefaultValueFillMode.Value,
+      enforceFullOutput: Boolean = false): Seq[NamedExpression] = {
     val matchedCols = mutable.HashSet.empty[String]
     val reordered = expectedCols.flatMap { expectedCol =>
       val matched = inputCols.filter(col => conf.resolver(col.name, 
expectedCol.name))
@@ -336,15 +343,15 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
           case (matchedType: StructType, expectedType: StructType) =>
             resolveStructType(
               tableName, matchedCol, matchedType, actualExpectedCol, 
expectedType,
-              byName = true, conf, addError, newColPath, childFillDefaultValue)
+              byName = true, conf, addError, newColPath, 
childFillDefaultValue, enforceFullOutput)
           case (matchedType: ArrayType, expectedType: ArrayType) =>
             resolveArrayType(
               tableName, matchedCol, matchedType, actualExpectedCol, 
expectedType,
-              byName = true, conf, addError, newColPath, childFillDefaultValue)
+              byName = true, conf, addError, newColPath, 
childFillDefaultValue, enforceFullOutput)
           case (matchedType: MapType, expectedType: MapType) =>
             resolveMapType(
               tableName, matchedCol, matchedType, actualExpectedCol, 
expectedType,
-              byName = true, conf, addError, newColPath, childFillDefaultValue)
+              byName = true, conf, addError, newColPath, 
childFillDefaultValue, enforceFullOutput)
           case _ =>
             checkField(
               tableName, actualExpectedCol, matchedCol, byName = true, conf, 
addError, newColPath)
@@ -366,6 +373,11 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
       } else {
         reordered
       }
+    } else if (enforceFullOutput) {
+      val colName =
+        if (colPath.nonEmpty) colPath.quoted
+        else expectedCols.map(_.name).map(toSQLId).mkString(", ")
+      throw 
QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName, 
colName)
     } else {
       Nil
     }
@@ -377,7 +389,8 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
       expectedCols: Seq[Attribute],
       conf: SQLConf,
       addError: String => Unit,
-      colPath: Seq[String] = Nil): Seq[NamedExpression] = {
+      colPath: Seq[String] = Nil,
+      fillDefaultValue: Boolean = false): Seq[NamedExpression] = {
     val actualExpectedCols = expectedCols.map { attr =>
       attr.withDataType { 
CharVarcharUtils.getRawType(attr.metadata).getOrElse(attr.dataType) }
     }
@@ -393,7 +406,7 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
           tableName, colPath.quoted, extraColsStr
         )
       }
-    } else if (inputCols.size < actualExpectedCols.size) {
+    } else if (inputCols.size < actualExpectedCols.size && !fillDefaultValue) {
       val missingColsStr = 
actualExpectedCols.takeRight(actualExpectedCols.size - inputCols.size)
         .map(col => toSQLId(col.name))
         .mkString(", ")
@@ -407,25 +420,48 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
       }
     }
 
-    inputCols.zip(actualExpectedCols).flatMap { case (inputCol, expectedCol) =>
+    val matched = inputCols.zip(actualExpectedCols).flatMap { case (inputCol, 
expectedCol) =>
       val newColPath = colPath :+ expectedCol.name
       (inputCol.dataType, expectedCol.dataType) match {
         case (inputType: StructType, expectedType: StructType) =>
           resolveStructType(
             tableName, inputCol, inputType, expectedCol, expectedType,
-            byName = false, conf, addError, newColPath, fillDefaultValue = 
false)
+            byName = false, conf, addError, newColPath, fillDefaultValue, 
enforceFullOutput = true)
         case (inputType: ArrayType, expectedType: ArrayType) =>
           resolveArrayType(
             tableName, inputCol, inputType, expectedCol, expectedType,
-            byName = false, conf, addError, newColPath, fillDefaultValue = 
false)
+            byName = false, conf, addError, newColPath, fillDefaultValue, 
enforceFullOutput = true)
         case (inputType: MapType, expectedType: MapType) =>
           resolveMapType(
             tableName, inputCol, inputType, expectedCol, expectedType,
-            byName = false, conf, addError, newColPath, fillDefaultValue = 
false)
+            byName = false, conf, addError, newColPath, fillDefaultValue, 
enforceFullOutput = true)
         case _ =>
           checkField(tableName, expectedCol, inputCol, byName = false, conf, 
addError, newColPath)
       }
     }
+
+    val defaults = if (fillDefaultValue) {
+      actualExpectedCols.drop(inputCols.size).map { expectedCol =>
+        val defaultExpr = getDefaultValueExprOrNullLit(
+          expectedCol, conf.useNullsForMissingDefaultColumnValues)
+        if (defaultExpr.isEmpty) {
+          throw 
QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(
+            tableName, (colPath :+ expectedCol.name).quoted)
+        }
+        applyColumnMetadata(defaultExpr.get, expectedCol)
+      }
+    } else {
+      Nil
+    }
+
+    val result = matched ++ defaults
+    if (result.length != actualExpectedCols.size) {
+      val colName =
+        if (colPath.nonEmpty) colPath.quoted
+        else actualExpectedCols.map(_.name).map(toSQLId).mkString(", ")
+      throw 
QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName, 
colName)
+    }
+    result
   }
 
   private[sql] def checkNullability(
@@ -447,6 +483,7 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
     input.nullable && !attr.nullable && conf.storeAssignmentPolicy != 
StoreAssignmentPolicy.LEGACY
   }
 
+  // scalastyle:off argcount
   private def resolveStructType(
       tableName: String,
       input: Expression,
@@ -457,7 +494,8 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
       conf: SQLConf,
       addError: String => Unit,
       colPath: Seq[String],
-      fillDefaultValue: Boolean): Option[NamedExpression] = {
+      fillDefaultValue: Boolean,
+      enforceFullOutput: Boolean): Option[NamedExpression] = {
     val nullCheckedInput = checkNullability(input, expected, conf, colPath)
     val fields = inputType.zipWithIndex.map { case (f, i) =>
       Alias(GetStructField(nullCheckedInput, i, Some(f.name)), f.name)()
@@ -465,10 +503,10 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
     val defaultValueMode = if (fillDefaultValue) RECURSE else NONE
     val resolved = if (byName) {
       reorderColumnsByName(tableName, fields, toAttributes(expectedType), 
conf, addError, colPath,
-        defaultValueMode)
+        defaultValueMode, enforceFullOutput)
     } else {
       resolveColumnsByPosition(
-        tableName, fields, toAttributes(expectedType), conf, addError, colPath)
+        tableName, fields, toAttributes(expectedType), conf, addError, 
colPath, fillDefaultValue)
     }
     if (resolved.length == expectedType.length) {
       val struct = CreateStruct(resolved)
@@ -478,6 +516,11 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
         struct
       }
       Some(applyColumnMetadata(res, expected))
+    } else if (enforceFullOutput) {
+      val colName =
+        if (colPath.nonEmpty) colPath.quoted
+        else expectedType.fields.map(_.name).map(toSQLId).mkString(", ")
+      throw 
QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName, 
colName)
     } else {
       None
     }
@@ -493,7 +536,8 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
       conf: SQLConf,
       addError: String => Unit,
       colPath: Seq[String],
-      fillDefaultValue: Boolean): Option[NamedExpression] = {
+      fillDefaultValue: Boolean,
+      enforceFullOutput: Boolean): Option[NamedExpression] = {
     val nullCheckedInput = checkNullability(input, expected, conf, colPath)
     val param = NamedLambdaVariable("element", inputType.elementType, 
inputType.containsNull)
     val fakeAttr =
@@ -501,9 +545,10 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
     val res = if (byName) {
       val defaultValueMode = if (fillDefaultValue) RECURSE else NONE
       reorderColumnsByName(tableName, Seq(param), Seq(fakeAttr), conf, 
addError, colPath,
-        defaultValueMode)
+        defaultValueMode, enforceFullOutput)
     } else {
-      resolveColumnsByPosition(tableName, Seq(param), Seq(fakeAttr), conf, 
addError, colPath)
+      resolveColumnsByPosition(
+        tableName, Seq(param), Seq(fakeAttr), conf, addError, colPath, 
fillDefaultValue)
     }
     if (res.length == 1) {
       val castedArray =
@@ -515,6 +560,9 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
           ArrayTransform(nullCheckedInput, func)
         }
       Some(applyColumnMetadata(castedArray, expected))
+    } else if (enforceFullOutput) {
+      val colName = if (colPath.nonEmpty) colPath.quoted else 
toSQLId(expected.name)
+      throw 
QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName, 
colName)
     } else {
       None
     }
@@ -530,7 +578,8 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
       conf: SQLConf,
       addError: String => Unit,
       colPath: Seq[String],
-      fillDefaultValue: Boolean): Option[NamedExpression] = {
+      fillDefaultValue: Boolean,
+      enforceFullOutput: Boolean): Option[NamedExpression] = {
     val nullCheckedInput = checkNullability(input, expected, conf, colPath)
 
     val keyParam = NamedLambdaVariable("key", inputType.keyType, nullable = 
false)
@@ -538,9 +587,10 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
     val defaultValueFillMode = if (fillDefaultValue) RECURSE else NONE
     val resKey = if (byName) {
       reorderColumnsByName(tableName, Seq(keyParam), Seq(fakeKeyAttr), conf, 
addError, colPath,
-        defaultValueFillMode)
+        defaultValueFillMode, enforceFullOutput)
     } else {
-      resolveColumnsByPosition(tableName, Seq(keyParam), Seq(fakeKeyAttr), 
conf, addError, colPath)
+      resolveColumnsByPosition(
+        tableName, Seq(keyParam), Seq(fakeKeyAttr), conf, addError, colPath, 
fillDefaultValue)
     }
 
     val valueParam =
@@ -549,10 +599,10 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
       AttributeReference("value", expectedType.valueType, 
expectedType.valueContainsNull)()
     val resValue = if (byName) {
       reorderColumnsByName(tableName, Seq(valueParam), Seq(fakeValueAttr), 
conf, addError, colPath,
-        defaultValueFillMode)
+        defaultValueFillMode, enforceFullOutput)
     } else {
       resolveColumnsByPosition(
-        tableName, Seq(valueParam), Seq(fakeValueAttr), conf, addError, 
colPath)
+        tableName, Seq(valueParam), Seq(fakeValueAttr), conf, addError, 
colPath, fillDefaultValue)
     }
 
     if (resKey.length == 1 && resValue.length == 1) {
@@ -577,10 +627,14 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
           MapFromArrays(newKeys, newValues)
         }
       Some(applyColumnMetadata(casted, expected))
+    } else if (enforceFullOutput) {
+      val colName = if (colPath.nonEmpty) colPath.quoted else 
toSQLId(expected.name)
+      throw 
QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName, 
colName)
     } else {
       None
     }
   }
+  // scalastyle:on argcount
 
   // For table insertions, capture the overflow errors and show proper message.
   // Without this method, the overflow errors of castings will show hints for 
turning off ANSI SQL
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index fcb736e40485..29a734f05598 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -7270,6 +7270,18 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val INSERT_INTO_NESTED_TYPE_COERCION_ENABLED =
+    buildConf("spark.sql.insertNestedTypeCoercion.enabled")
+      .internal()
+      .doc("If enabled, allow INSERT INTO WITH SCHEMA EVOLUTION to fill 
missing nested " +
+        "struct fields with null when the source has fewer nested fields than 
the target " +
+        "table. Also relaxes by-position column-count enforcement so trailing 
missing " +
+        "top-level columns are filled with their default value (or null). This 
is " +
+        "experimental and the semantics may change.")
+      .version("4.2.0")
+      .booleanConf
+      .createWithDefault(false)
+
   val TIME_TYPE_ENABLED =
     buildConf("spark.sql.timeType.enabled")
       .internal()
@@ -8597,6 +8609,9 @@ class SQLConf extends Serializable with Logging with 
SqlApiConf {
   def coerceMergeNestedTypes: Boolean =
     getConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED)
 
+  def coerceInsertNestedTypes: Boolean =
+    getConf(SQLConf.INSERT_INTO_NESTED_TYPE_COERCION_ENABLED)
+
   def isTimeTypeEnabled: Boolean = getConf(SQLConf.TIME_TYPE_ENABLED)
 
   def listaggAllowDistinctCastWithOrder: Boolean = 
getConf(LISTAGG_ALLOW_DISTINCT_CAST_WITH_ORDER)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index ce41bbe4aeb3..7122dd52ef1a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -528,7 +528,7 @@ object PreprocessTableInsertion extends 
ResolveInsertionBase {
         query,
         byName,
         conf,
-        supportColDefaultValue = true)
+        TableOutputResolver.DefaultValueFillMode.FILL)
     } catch {
       case e: AnalysisException if staticPartCols.nonEmpty &&
         (e.getCondition == 
"INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS" ||
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
index 4f023136a6fe..42017c2dd60e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
@@ -574,6 +574,26 @@ trait InsertIntoSchemaEvolutionTests { this: 
InsertIntoTests =>
       byName: Boolean = false,
       replaceWhere: Option[String] = None): Unit
 
+  /** Insert data into a table by name without schema evolution. */
+  protected def doInsertByName(
+      tableName: String,
+      insert: DataFrame,
+      mode: SaveMode = SaveMode.Append): Unit = {
+    val tmpView = "tmp_view"
+    withTempView(tmpView) {
+      insert.createOrReplaceTempView(tmpView)
+      val overwrite = if (mode == SaveMode.Overwrite) "OVERWRITE" else "INTO"
+      sql(s"INSERT $overwrite TABLE $tableName BY NAME SELECT * FROM $tmpView")
+    }
+  }
+
+  /** Run a block with INSERT nested type coercion enabled. */
+  protected def withInsertNestedTypeCoercion(f: => Unit): Unit = {
+    withSQLConf(SQLConf.INSERT_INTO_NESTED_TYPE_COERCION_ENABLED.key -> 
"true") {
+      f
+    }
+  }
+
   test("Insert schema evolution: extra column - no auto-schema-evolution 
capability") {
     val t1 = s"${catalogAndNamespace}tbl"
     withTable(t1) {
@@ -1403,4 +1423,722 @@ trait InsertIntoSchemaEvolutionTests { this: 
InsertIntoTests =>
       assert(spark.table(t1).schema("id").dataType === IntegerType)
     }
   }
+
+  // 
---------------------------------------------------------------------------
+  // Tests for source with fewer columns/fields than target
+  // 
---------------------------------------------------------------------------
+
+  test("Insert schema evolution: source missing top-level column by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val schema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("salary", IntegerType),
+        StructField("dep", StringType)))
+      val data = Seq(Row(0, 100, "sales"))
+      sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+      doInsert(t1, spark.createDataFrame(spark.sparkContext.parallelize(data), 
schema))
+      doInsertWithSchemaEvolution(t1,
+        Seq((1, "engineering")).toDF("id", "dep"),
+        byName = true)
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, 100, "sales"), Row(1, null, "engineering")))
+    }
+  }
+
+  test("Insert schema evolution: source missing top-level column by position") 
{
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+      doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+      // By position: source col 1 maps to target col 1, source col 2 maps to 
target col 2,
+      // trailing target col 3 is filled with null.
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1,
+          Seq((1, 200)).toDF("id", "salary"))
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, 100, "sales"), Row(1, 200, null)))
+    }
+  }
+
+  test("Insert schema evolution: source missing top-level column with DEFAULT 
by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, salary int DEFAULT 200, dep string) 
USING $v2Format")
+      doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+      doInsertWithSchemaEvolution(t1,
+        Seq((1, "engineering")).toDF("id", "dep"),
+        byName = true)
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, 100, "sales"), Row(1, 200, "engineering")))
+    }
+  }
+
+  test("Insert schema evolution: source missing top-level column with DEFAULT 
by position") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, salary int, dep string DEFAULT 
'unknown') USING $v2Format")
+      doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1,
+          Seq((1, 200)).toDF("id", "salary"))
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, 100, "sales"), Row(1, 200, "unknown")))
+    }
+  }
+
+  test("Insert schema evolution: source missing nested struct field by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType))))))
+      sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>) 
USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), 
targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), 
sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null))))
+    }
+  }
+
+  test("Insert schema evolution: source missing nested struct field by 
position") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType))))))
+      sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>) 
USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), 
targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), 
sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null))))
+    }
+  }
+
+  test("Insert schema evolution: source missing field in struct nested in 
array by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("a", ArrayType(StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType)))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"a array<struct<c1:int,c2:string,c3:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Seq(Row(1, "a", true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("a", ArrayType(StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType)))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Seq(Row(10, "b"))))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Seq(Row(1, "a", true))), Row(1, Seq(Row(10, "b", null)))))
+    }
+  }
+
+  test("Insert schema evolution: source missing field in struct nested in 
array by position") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("a", ArrayType(StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType)))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"a array<struct<c1:int,c2:string,c3:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Seq(Row(1, "a", true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("a", ArrayType(StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType)))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Seq(Row(10, "b"))))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Seq(Row(1, "a", true))), Row(1, Seq(Row(10, "b", null)))))
+    }
+  }
+
+  test("Insert schema evolution: source missing deeply nested struct field by 
name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StructType(Seq(
+            StructField("a", IntegerType),
+            StructField("b", BooleanType)))))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"s struct<c1:int,c2:struct<a:int,b:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, Row(10, true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StructType(Seq(
+            StructField("a", IntegerType)))))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(20, Row(30))))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, Row(10, true))), Row(1, Row(20, Row(30, null)))))
+    }
+  }
+
+  test("Insert schema evolution: source with null struct and missing nested 
field by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", IntegerType))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"s struct<c1:int,c2:string,c3:int>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", 10)))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, null))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, "a", 10)), Row(1, null)))
+    }
+  }
+
+  test("Insert schema evolution: source with null struct and missing nested 
field by position") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", IntegerType))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"s struct<c1:int,c2:string,c3:int>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", 10)))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, null))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, "a", 10)), Row(1, null)))
+    }
+  }
+
+  test("Insert schema evolution: mixed null and non-null structs with missing 
field by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType))))))
+      sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>) 
USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")), Row(2, 
null))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null)), Row(2, 
null)))
+    }
+  }
+
+  test("Insert schema evolution: null deeply nested struct with missing field 
by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StructType(Seq(
+            StructField("a", IntegerType),
+            StructField("b", BooleanType)))))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"s struct<c1:int,c2:struct<a:int,b:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, Row(10, true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StructType(Seq(
+            StructField("a", IntegerType)))))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(20, null)))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, Row(10, true))), Row(1, Row(20, null))))
+    }
+  }
+
+  test("Insert schema evolution: null struct in array with missing field by 
name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("a", ArrayType(StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", BooleanType)))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"a array<struct<c1:int,c2:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Seq(Row(1, true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("a", ArrayType(StructType(Seq(
+          StructField("c1", IntegerType)))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Seq(Row(10), null)))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Seq(Row(1, true))), Row(1, Seq(Row(10, null), null))))
+    }
+  }
+
+  test("Insert schema evolution: source missing field in struct nested in map 
value by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("m", MapType(StringType, StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", BooleanType)))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"m map<string, struct<c1:int,c2:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Map("x" -> Row(1, true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("m", MapType(StringType, StructType(Seq(
+          StructField("c1", IntegerType)))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Map("y" -> Row(10))))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(
+          Row(0, Map("x" -> Row(1, true))),
+          Row(1, Map("y" -> Row(10, null)))))
+    }
+  }
+
+  test("Insert schema evolution: source missing field in struct nested in map 
value by position") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("m", MapType(StringType, StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", BooleanType)))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"m map<string, struct<c1:int,c2:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Map("x" -> Row(1, true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("m", MapType(StringType, StructType(Seq(
+          StructField("c1", IntegerType)))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Map("y" -> Row(10))))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(
+          Row(0, Map("x" -> Row(1, true))),
+          Row(1, Map("y" -> Row(10, null)))))
+    }
+  }
+
+  test("Insert schema evolution: extra and missing top-level column by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+      doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+      // Source has "active" (extra) but is missing "salary". Column count is 
the same (3)
+      // but names differ; by-name resolution should add "active" via schema 
evolution
+      // and fill "salary" with null.
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1,
+          Seq((1, "engineering", true)).toDF("id", "dep", "active"),
+          byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT id, salary, dep, active FROM $t1"),
+        Seq(Row(0, 100, "sales", null), Row(1, null, "engineering", true)))
+    }
+  }
+
+  test("Insert schema evolution: extra and missing nested struct field by 
name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType))))))
+      sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>) 
USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), 
targetSchema)
+      doInsert(t1, targetData)
+
+      // Source struct has "c1", "c2", "c4" (extra) but is missing "c3". Field 
count is the same
+      // (3) but names differ; by-name resolution should add "c4" via schema 
evolution and fill
+      // "c3" with null.
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c4", DoubleType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b", 3.14)))), 
sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT id, s.c1, s.c2, s.c3, s.c4 FROM $t1"),
+        Seq(Row(0, 1, "a", true, null), Row(1, 10, "b", null, 3.14)))
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Negative tests: missing columns/fields should fail WITHOUT schema 
evolution
+  // 
---------------------------------------------------------------------------
+
+  test("Insert without evolution: source missing top-level column by name 
fails") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+      doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+      // Without explicit DEFAULT on `salary`, missing by-name data only 
errors when null-fill
+      // for missing defaults is disabled; otherwise FILL mode inserts null 
for `salary`.
+      withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> 
"false") {
+        checkError(
+          exception = intercept[AnalysisException] {
+            doInsertByName(t1, Seq((1, "engineering")).toDF("id", "dep"))
+          },
+          condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+          parameters = Map(
+            "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+            "colName" -> "`salary`")
+        )
+      }
+    }
+  }
+
+  test("Insert schema evolution: source missing top-level column by position 
fails " +
+      "when null default disabled and column has no explicit DEFAULT") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+      doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+      withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> 
"false") {
+        withInsertNestedTypeCoercion {
+          checkError(
+            exception = intercept[AnalysisException] {
+              doInsertWithSchemaEvolution(t1,
+                Seq((1, 200)).toDF("id", "salary"))
+            },
+            condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+            parameters = Map(
+              "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+              "colName" -> "`dep`")
+          )
+        }
+      }
+    }
+  }
+
+  test("Insert schema evolution: source missing nested struct field by 
position fails " +
+      "when null default disabled") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType))))))
+      sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>) 
USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), 
targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), 
sourceSchema)
+      withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> 
"false") {
+        withInsertNestedTypeCoercion {
+          checkError(
+            exception = intercept[AnalysisException] {
+              doInsertWithSchemaEvolution(t1, sourceData)
+            },
+            condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+            parameters = Map(
+              "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+              "colName" -> "`s`.`c3`")
+          )
+        }
+      }
+    }
+  }
+
+  test("Insert without evolution: source missing top-level column by position 
fails") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+      doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          doInsert(t1, Seq((1, 200)).toDF("id", "salary"))
+        },
+        condition = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
+        parameters = Map(
+          "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+          "tableColumns" -> "`id`, `salary`, `dep`",
+          "dataColumns" -> "`id`, `salary`")
+      )
+    }
+  }
+
+  test("Insert without evolution: source missing nested struct field by name 
fails") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>) 
USING $v2Format")
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType))))))
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), 
targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), 
sourceSchema)
+      checkError(
+        exception = intercept[AnalysisException] {
+          doInsertByName(t1, sourceData)
+        },
+        condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+        parameters = Map(
+          "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+          "colName" -> "`s`.`c3`")
+      )
+    }
+  }
+
+  test("Insert with evolution but without coercion flag:" +
+      " source missing nested struct field by name fails") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>) 
USING $v2Format")
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType))))))
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), 
targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), 
sourceSchema)
+      checkError(
+        exception = intercept[AnalysisException] {
+          doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+        },
+        condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+        parameters = Map(
+          "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+          "colName" -> "`s`.`c3`")
+      )
+    }
+  }
+
+  test("Insert without evolution: source missing nested struct field by 
position fails") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType))))))
+      sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>) 
USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), 
targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), 
sourceSchema)
+      checkError(
+        exception = intercept[AnalysisException] {
+          doInsert(t1, sourceData)
+        },
+        condition = "INCOMPATIBLE_DATA_FOR_TABLE.STRUCT_MISSING_FIELDS",
+        parameters = Map(
+          "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+          "colName" -> "`s`",
+          "missingFields" -> "`c3`")
+      )
+    }
+  }
 }
diff --git 
a/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions
 
b/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions
index 2aa6cb885ca3..36fda2b50688 100644
--- 
a/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions
+++ 
b/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions
@@ -659,6 +659,7 @@ spark.sql.inMemoryColumnarStorage.hugeVectorReserveRatio
 spark.sql.inMemoryColumnarStorage.hugeVectorThreshold
 spark.sql.inMemoryColumnarStorage.partitionPruning
 spark.sql.inMemoryTableScanStatistics.enable
+spark.sql.insertNestedTypeCoercion.enabled
 spark.sql.join.preferSortMergeJoin
 spark.sql.json.enableExactStringParsing
 spark.sql.json.enablePartialResults


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to