This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new 745bffc36b06 [SPARK-56550][SQL] Support source with fewer 
columns/fields in INSERT INTO WITH SCHEMA EVOLUTION
745bffc36b06 is described below

commit 745bffc36b063458bc91c24328f60cec4d3985d3
Author: Szehon Ho <[email protected]>
AuthorDate: Wed May 20 02:11:30 2026 +0800

    [SPARK-56550][SQL] Support source with fewer columns/fields in INSERT INTO 
WITH SCHEMA EVOLUTION
    
    ### What changes were proposed in this pull request?
    
    Add support for `INSERT INTO ... WITH SCHEMA EVOLUTION` to fill missing 
nested struct fields with null (or column defaults) when the source has fewer 
fields than the target table. This mirrors the existing `MERGE INTO` behavior 
gated by `spark.sql.mergeNestedTypeCoercion.enabled`.
    
    **Specific changes:**
    
    1. **New config flag**: `spark.sql.insertNestedTypeCoercion.enabled` 
(internal, default `false`) — mirrors the existing 
`spark.sql.mergeNestedTypeCoercion.enabled` for MERGE.
    
    2. **Refactored `TableOutputResolver.resolveOutputColumns`**: Replaced two 
overlapping boolean parameters (`supportColDefaultValue`, `fillNestedDefaults`) 
with a single `DefaultValueFillMode` enum (`NONE`, `FILL`, `RECURSE`), making 
the API cleaner and the intent explicit at each call site.
    
    3. **RECURSE mode for V2 inserts**: When both schema evolution and the 
coercion flag are enabled, `RECURSE` mode fills missing nested struct fields 
with null, relaxes the by-position arity check, and recurses into structs 
nested within arrays and maps.
    
    **Supported scenarios** (source has fewer columns/fields than target, with 
schema evolution + coercion flag):
    
    | Scenario | Before | After (+ coercion flag) |
    |---|---|---|
    | Missing top-level column (by name) | fill with default/null | same 
(unchanged) |
    | Missing top-level column with DEFAULT (by name) | fill with default value 
| same (unchanged) |
    | Missing top-level column (by position) | error | fill trailing with 
default/null |
    | Missing top-level column with DEFAULT (by position) | error | fill 
trailing with default value |
    | Missing nested struct field (by name) | error | fill with null |
    | Missing nested struct field (by position) | error | fill with null |
    | Missing field in struct inside array (by name) | error | fill with null |
    | Missing field in struct inside map value (by name) | error | fill with 
null |
    | Missing deeply nested struct field (by name) | error | fill with null |
    
    ### Why are the changes needed?
    
    `MERGE INTO` already supports coercing nested types when the source has 
fewer struct fields than the target (via 
`spark.sql.mergeNestedTypeCoercion.enabled`). `INSERT INTO WITH SCHEMA 
EVOLUTION` lacked this capability, causing errors for legitimate use cases 
where the source schema is a subset of the target schema at the nested level.
    
    This is important for schema evolution workflows where tables accumulate 
new nested fields over time, but older data sources may not have all fields 
populated.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. When `spark.sql.insertNestedTypeCoercion.enabled` is set to `true` 
(default `false`), `INSERT INTO ... WITH SCHEMA EVOLUTION` will no longer fail 
when the source has fewer nested struct fields than the target. Instead, 
missing fields are filled with null. This is gated behind an internal, 
experimental config flag.
    
    ### How was this patch tested?
    
    Added 17 new test cases in `InsertIntoSchemaEvolutionTests`:
    
    **Positive tests** (with schema evolution + coercion flag):
    - Missing top-level column by name / by position
    - Missing top-level column with DEFAULT value by name / by position
    - Missing nested struct field by name / by position
    - Missing field in struct nested in array / map value
    - Missing deeply nested struct field
    - Null struct with missing field by name / by position
    - Mixed null and non-null structs with missing field
    - Null deeply nested struct with missing field
    - Null struct in array with missing field
    
    **Negative tests** (verifying errors when coercion is disabled):
    - Missing top-level column by name / by position (without evolution)
    - Missing nested struct field by name / by position (without evolution)
    - Missing nested struct field with evolution but without coercion flag
    
    All 64 matched tests pass.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Cursor (Claude Opus 4)
    
    Closes #55427 from szehon-ho/insert-schema-evolution-missing-fields.
    
    Authored-by: Szehon Ho <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit 558ea1ce32aba863c7eb476cfb5020aa227170db)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  10 +-
 .../catalyst/analysis/TableOutputResolver.scala    | 124 +++-
 .../org/apache/spark/sql/internal/SQLConf.scala    |  15 +
 .../spark/sql/execution/datasources/rules.scala    |   2 +-
 .../spark/sql/connector/InsertIntoTests.scala      | 738 +++++++++++++++++++++
 .../configs-without-binding-policy-exceptions      |   1 +
 6 files changed, 853 insertions(+), 37 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 09c6a0984258..faa78e030636 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -29,6 +29,7 @@ import org.apache.spark.{SparkException, 
SparkUnsupportedOperationException}
 import org.apache.spark.internal.config.ConfigBindingPolicy
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst._
+import 
org.apache.spark.sql.catalyst.analysis.TableOutputResolver.DefaultValueFillMode._
 import org.apache.spark.sql.catalyst.analysis.resolver.{
   AnalyzerBridgeState,
   HybridAnalyzer,
@@ -3788,9 +3789,16 @@ class Analyzer(
         validateStoreAssignmentPolicy()
         TableOutputResolver.suitableForByNameCheck(v2Write.isByName,
           expected = v2Write.table.output, queryOutput = v2Write.query.output)
+        // With schema evolution + coercion flag, missing top-level columns 
AND missing nested
+        // struct fields are filled with defaults/null (RECURSE mode). 
Otherwise, only missing
+        // top-level columns are filled via FILL mode; missing nested struct 
fields still cause
+        // schema enforcement errors.
+        val defaultValueFillMode =
+          if (conf.coerceInsertNestedTypes && v2Write.schemaEvolutionEnabled) 
RECURSE
+          else FILL
         val projection = TableOutputResolver.resolveOutputColumns(
           v2Write.table.name, v2Write.table.output, v2Write.query, 
v2Write.isByName, conf,
-          supportColDefaultValue = true)
+          defaultValueFillMode)
         if (projection != v2Write.query) {
           val cleanedTable = v2Write.table match {
             case r: DataSourceV2Relation =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
index 7eacc5ab9b2a..d691c449733f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
@@ -41,9 +41,10 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
 
   /**
    * Modes for filling in default or null values for missing columns.
-   * If FILL, fill missing top-level columns with their default values.
-   * If RECURSE, fill missing top-level columns and also recurse into nested 
struct
-   * fields to fill null.
+   * If FILL, fill missing top-level columns with their default values 
(by-name reorder path).
+   * If RECURSE, fill missing top-level columns (including trailing columns on 
the by-position
+   * path for INSERT with schema evolution when enabled) and recurse into 
nested structs,
+   * arrays, and maps to fill missing struct fields with null or defaults.
    * If NONE, do not fill any missing columns.
    */
   object DefaultValueFillMode extends Enumeration {
@@ -92,19 +93,22 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
       query: LogicalPlan,
       byName: Boolean,
       conf: SQLConf,
-      supportColDefaultValue: Boolean = false): LogicalPlan = {
+      defaultValueFillMode: DefaultValueFillMode.Value = NONE): LogicalPlan = {
 
     if (expected.size < query.output.size) {
       throw QueryCompilationErrors.cannotWriteTooManyColumnsToTableError(
         tableName, expected.map(_.name), query.output)
     }
 
+    // In RECURSE mode, allow fewer source columns than target by filling 
trailing columns
+    // with defaults. In other modes, a column count mismatch in by-position 
resolution is
+    // an error.
+    val fillDefaultValue = defaultValueFillMode == RECURSE
     val errors = new mutable.ArrayBuffer[String]()
     val resolved: Seq[NamedExpression] = if (byName) {
-      // If a top-level column does not have a corresponding value in the 
input query, fill with
-      // the column's default value. We need to pass `fillDefaultValue` as 
FILL here, if the
-      // `supportColDefaultValue` parameter is also true.
-      val defaultValueFillMode = if (supportColDefaultValue) FILL else NONE
+      // By-name resolution: the defaultValueFillMode is passed through to 
control whether
+      // missing top-level columns are filled (FILL/RECURSE) and whether 
missing nested
+      // struct fields are also filled (RECURSE only).
       reorderColumnsByName(
         tableName,
         query.output,
@@ -112,13 +116,15 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
         conf,
         errors += _,
         Nil,
-        defaultValueFillMode)
+        defaultValueFillMode,
+        enforceFullOutput = true)
     } else {
-      if (expected.size > query.output.size) {
+      if (expected.size > query.output.size && !fillDefaultValue) {
         throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError(
           tableName, expected.map(_.name), query.output)
       }
-      resolveColumnsByPosition(tableName, query.output, expected, conf, errors 
+= _)
+      resolveColumnsByPosition(
+        tableName, query.output, expected, conf, errors += _, fillDefaultValue 
= fillDefaultValue)
     }
 
     if (errors.nonEmpty) {
@@ -157,17 +163,17 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
       case (valueType: StructType, colType: StructType) =>
         val resolvedValue = resolveStructType(
           tableName, value, valueType, col, colType,
-          byName = true, conf, addError, colPath, fillChildDefaultValue)
+          byName = true, conf, addError, colPath, fillChildDefaultValue, 
enforceFullOutput = false)
         resolvedValue.getOrElse(value)
       case (valueType: ArrayType, colType: ArrayType) =>
         val resolvedValue = resolveArrayType(
           tableName, value, valueType, col, colType,
-          byName = true, conf, addError, colPath, fillChildDefaultValue)
+          byName = true, conf, addError, colPath, fillChildDefaultValue, 
enforceFullOutput = false)
         resolvedValue.getOrElse(value)
       case (valueType: MapType, colType: MapType) =>
         val resolvedValue = resolveMapType(
           tableName, value, valueType, col, colType,
-          byName = true, conf, addError, colPath, fillChildDefaultValue)
+          byName = true, conf, addError, colPath, fillChildDefaultValue, 
enforceFullOutput = false)
         resolvedValue.getOrElse(value)
       case _ =>
         checkUpdate(tableName, value, col, conf, addError, colPath)
@@ -304,7 +310,8 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
       conf: SQLConf,
       addError: String => Unit,
       colPath: Seq[String] = Nil,
-      defaultValueFillMode: DefaultValueFillMode.Value): Seq[NamedExpression] 
= {
+      defaultValueFillMode: DefaultValueFillMode.Value,
+      enforceFullOutput: Boolean = false): Seq[NamedExpression] = {
     val matchedCols = mutable.HashSet.empty[String]
     val reordered = expectedCols.flatMap { expectedCol =>
       val matched = inputCols.filter(col => conf.resolver(col.name, 
expectedCol.name))
@@ -336,15 +343,15 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
           case (matchedType: StructType, expectedType: StructType) =>
             resolveStructType(
               tableName, matchedCol, matchedType, actualExpectedCol, 
expectedType,
-              byName = true, conf, addError, newColPath, childFillDefaultValue)
+              byName = true, conf, addError, newColPath, 
childFillDefaultValue, enforceFullOutput)
           case (matchedType: ArrayType, expectedType: ArrayType) =>
             resolveArrayType(
               tableName, matchedCol, matchedType, actualExpectedCol, 
expectedType,
-              byName = true, conf, addError, newColPath, childFillDefaultValue)
+              byName = true, conf, addError, newColPath, 
childFillDefaultValue, enforceFullOutput)
           case (matchedType: MapType, expectedType: MapType) =>
             resolveMapType(
               tableName, matchedCol, matchedType, actualExpectedCol, 
expectedType,
-              byName = true, conf, addError, newColPath, childFillDefaultValue)
+              byName = true, conf, addError, newColPath, 
childFillDefaultValue, enforceFullOutput)
           case _ =>
             checkField(
               tableName, actualExpectedCol, matchedCol, byName = true, conf, 
addError, newColPath)
@@ -366,6 +373,11 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
       } else {
         reordered
       }
+    } else if (enforceFullOutput) {
+      val colName =
+        if (colPath.nonEmpty) colPath.quoted
+        else expectedCols.map(_.name).map(toSQLId).mkString(", ")
+      throw 
QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName, 
colName)
     } else {
       Nil
     }
@@ -377,7 +389,8 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
       expectedCols: Seq[Attribute],
       conf: SQLConf,
       addError: String => Unit,
-      colPath: Seq[String] = Nil): Seq[NamedExpression] = {
+      colPath: Seq[String] = Nil,
+      fillDefaultValue: Boolean = false): Seq[NamedExpression] = {
     val actualExpectedCols = expectedCols.map { attr =>
       attr.withDataType { 
CharVarcharUtils.getRawType(attr.metadata).getOrElse(attr.dataType) }
     }
@@ -393,7 +406,7 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
           tableName, colPath.quoted, extraColsStr
         )
       }
-    } else if (inputCols.size < actualExpectedCols.size) {
+    } else if (inputCols.size < actualExpectedCols.size && !fillDefaultValue) {
       val missingColsStr = 
actualExpectedCols.takeRight(actualExpectedCols.size - inputCols.size)
         .map(col => toSQLId(col.name))
         .mkString(", ")
@@ -407,25 +420,48 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
       }
     }
 
-    inputCols.zip(actualExpectedCols).flatMap { case (inputCol, expectedCol) =>
+    val matched = inputCols.zip(actualExpectedCols).flatMap { case (inputCol, 
expectedCol) =>
       val newColPath = colPath :+ expectedCol.name
       (inputCol.dataType, expectedCol.dataType) match {
         case (inputType: StructType, expectedType: StructType) =>
           resolveStructType(
             tableName, inputCol, inputType, expectedCol, expectedType,
-            byName = false, conf, addError, newColPath, fillDefaultValue = 
false)
+            byName = false, conf, addError, newColPath, fillDefaultValue, 
enforceFullOutput = true)
         case (inputType: ArrayType, expectedType: ArrayType) =>
           resolveArrayType(
             tableName, inputCol, inputType, expectedCol, expectedType,
-            byName = false, conf, addError, newColPath, fillDefaultValue = 
false)
+            byName = false, conf, addError, newColPath, fillDefaultValue, 
enforceFullOutput = true)
         case (inputType: MapType, expectedType: MapType) =>
           resolveMapType(
             tableName, inputCol, inputType, expectedCol, expectedType,
-            byName = false, conf, addError, newColPath, fillDefaultValue = 
false)
+            byName = false, conf, addError, newColPath, fillDefaultValue, 
enforceFullOutput = true)
         case _ =>
           checkField(tableName, expectedCol, inputCol, byName = false, conf, 
addError, newColPath)
       }
     }
+
+    val defaults = if (fillDefaultValue) {
+      actualExpectedCols.drop(inputCols.size).map { expectedCol =>
+        val defaultExpr = getDefaultValueExprOrNullLit(
+          expectedCol, conf.useNullsForMissingDefaultColumnValues)
+        if (defaultExpr.isEmpty) {
+          throw 
QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(
+            tableName, (colPath :+ expectedCol.name).quoted)
+        }
+        applyColumnMetadata(defaultExpr.get, expectedCol)
+      }
+    } else {
+      Nil
+    }
+
+    val result = matched ++ defaults
+    if (result.length != actualExpectedCols.size) {
+      val colName =
+        if (colPath.nonEmpty) colPath.quoted
+        else actualExpectedCols.map(_.name).map(toSQLId).mkString(", ")
+      throw 
QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName, 
colName)
+    }
+    result
   }
 
   private[sql] def checkNullability(
@@ -447,6 +483,7 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
     input.nullable && !attr.nullable && conf.storeAssignmentPolicy != 
StoreAssignmentPolicy.LEGACY
   }
 
+  // scalastyle:off argcount
   private def resolveStructType(
       tableName: String,
       input: Expression,
@@ -457,7 +494,8 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
       conf: SQLConf,
       addError: String => Unit,
       colPath: Seq[String],
-      fillDefaultValue: Boolean): Option[NamedExpression] = {
+      fillDefaultValue: Boolean,
+      enforceFullOutput: Boolean): Option[NamedExpression] = {
     val nullCheckedInput = checkNullability(input, expected, conf, colPath)
     val fields = inputType.zipWithIndex.map { case (f, i) =>
       Alias(GetStructField(nullCheckedInput, i, Some(f.name)), f.name)()
@@ -465,10 +503,10 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
     val defaultValueMode = if (fillDefaultValue) RECURSE else NONE
     val resolved = if (byName) {
       reorderColumnsByName(tableName, fields, toAttributes(expectedType), 
conf, addError, colPath,
-        defaultValueMode)
+        defaultValueMode, enforceFullOutput)
     } else {
       resolveColumnsByPosition(
-        tableName, fields, toAttributes(expectedType), conf, addError, colPath)
+        tableName, fields, toAttributes(expectedType), conf, addError, 
colPath, fillDefaultValue)
     }
     if (resolved.length == expectedType.length) {
       val struct = CreateStruct(resolved)
@@ -478,6 +516,11 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
         struct
       }
       Some(applyColumnMetadata(res, expected))
+    } else if (enforceFullOutput) {
+      val colName =
+        if (colPath.nonEmpty) colPath.quoted
+        else expectedType.fields.map(_.name).map(toSQLId).mkString(", ")
+      throw 
QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName, 
colName)
     } else {
       None
     }
@@ -493,7 +536,8 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
       conf: SQLConf,
       addError: String => Unit,
       colPath: Seq[String],
-      fillDefaultValue: Boolean): Option[NamedExpression] = {
+      fillDefaultValue: Boolean,
+      enforceFullOutput: Boolean): Option[NamedExpression] = {
     val nullCheckedInput = checkNullability(input, expected, conf, colPath)
     val param = NamedLambdaVariable("element", inputType.elementType, 
inputType.containsNull)
     val fakeAttr =
@@ -501,9 +545,10 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
     val res = if (byName) {
       val defaultValueMode = if (fillDefaultValue) RECURSE else NONE
       reorderColumnsByName(tableName, Seq(param), Seq(fakeAttr), conf, 
addError, colPath,
-        defaultValueMode)
+        defaultValueMode, enforceFullOutput)
     } else {
-      resolveColumnsByPosition(tableName, Seq(param), Seq(fakeAttr), conf, 
addError, colPath)
+      resolveColumnsByPosition(
+        tableName, Seq(param), Seq(fakeAttr), conf, addError, colPath, 
fillDefaultValue)
     }
     if (res.length == 1) {
       val castedArray =
@@ -515,6 +560,9 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
           ArrayTransform(nullCheckedInput, func)
         }
       Some(applyColumnMetadata(castedArray, expected))
+    } else if (enforceFullOutput) {
+      val colName = if (colPath.nonEmpty) colPath.quoted else 
toSQLId(expected.name)
+      throw 
QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName, 
colName)
     } else {
       None
     }
@@ -530,7 +578,8 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
       conf: SQLConf,
       addError: String => Unit,
       colPath: Seq[String],
-      fillDefaultValue: Boolean): Option[NamedExpression] = {
+      fillDefaultValue: Boolean,
+      enforceFullOutput: Boolean): Option[NamedExpression] = {
     val nullCheckedInput = checkNullability(input, expected, conf, colPath)
 
     val keyParam = NamedLambdaVariable("key", inputType.keyType, nullable = 
false)
@@ -538,9 +587,10 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
     val defaultValueFillMode = if (fillDefaultValue) RECURSE else NONE
     val resKey = if (byName) {
       reorderColumnsByName(tableName, Seq(keyParam), Seq(fakeKeyAttr), conf, 
addError, colPath,
-        defaultValueFillMode)
+        defaultValueFillMode, enforceFullOutput)
     } else {
-      resolveColumnsByPosition(tableName, Seq(keyParam), Seq(fakeKeyAttr), 
conf, addError, colPath)
+      resolveColumnsByPosition(
+        tableName, Seq(keyParam), Seq(fakeKeyAttr), conf, addError, colPath, 
fillDefaultValue)
     }
 
     val valueParam =
@@ -549,10 +599,10 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
       AttributeReference("value", expectedType.valueType, 
expectedType.valueContainsNull)()
     val resValue = if (byName) {
       reorderColumnsByName(tableName, Seq(valueParam), Seq(fakeValueAttr), 
conf, addError, colPath,
-        defaultValueFillMode)
+        defaultValueFillMode, enforceFullOutput)
     } else {
       resolveColumnsByPosition(
-        tableName, Seq(valueParam), Seq(fakeValueAttr), conf, addError, 
colPath)
+        tableName, Seq(valueParam), Seq(fakeValueAttr), conf, addError, 
colPath, fillDefaultValue)
     }
 
     if (resKey.length == 1 && resValue.length == 1) {
@@ -577,10 +627,14 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
           MapFromArrays(newKeys, newValues)
         }
       Some(applyColumnMetadata(casted, expected))
+    } else if (enforceFullOutput) {
+      val colName = if (colPath.nonEmpty) colPath.quoted else 
toSQLId(expected.name)
+      throw 
QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName, 
colName)
     } else {
       None
     }
   }
+  // scalastyle:on argcount
 
   // For table insertions, capture the overflow errors and show proper message.
   // Without this method, the overflow errors of castings will show hints for 
turning off ANSI SQL
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index f2cad7bb48c9..57f39ad72355 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -7273,6 +7273,18 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val INSERT_INTO_NESTED_TYPE_COERCION_ENABLED =
+    buildConf("spark.sql.insertNestedTypeCoercion.enabled")
+      .internal()
+      .doc("If enabled, allow INSERT INTO WITH SCHEMA EVOLUTION to fill 
missing nested " +
+        "struct fields with null when the source has fewer nested fields than 
the target " +
+        "table. Also relaxes by-position column-count enforcement so trailing 
missing " +
+        "top-level columns are filled with their default value (or null). This 
is " +
+        "experimental and the semantics may change.")
+      .version("4.2.0")
+      .booleanConf
+      .createWithDefault(false)
+
   val TIME_TYPE_ENABLED =
     buildConf("spark.sql.timeType.enabled")
       .internal()
@@ -8600,6 +8612,9 @@ class SQLConf extends Serializable with Logging with 
SqlApiConf {
   def coerceMergeNestedTypes: Boolean =
     getConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED)
 
+  def coerceInsertNestedTypes: Boolean =
+    getConf(SQLConf.INSERT_INTO_NESTED_TYPE_COERCION_ENABLED)
+
   def isTimeTypeEnabled: Boolean = getConf(SQLConf.TIME_TYPE_ENABLED)
 
   def listaggAllowDistinctCastWithOrder: Boolean = 
getConf(LISTAGG_ALLOW_DISTINCT_CAST_WITH_ORDER)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index ce41bbe4aeb3..7122dd52ef1a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -528,7 +528,7 @@ object PreprocessTableInsertion extends 
ResolveInsertionBase {
         query,
         byName,
         conf,
-        supportColDefaultValue = true)
+        TableOutputResolver.DefaultValueFillMode.FILL)
     } catch {
       case e: AnalysisException if staticPartCols.nonEmpty &&
         (e.getCondition == 
"INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS" ||
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
index 4f023136a6fe..42017c2dd60e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
@@ -574,6 +574,26 @@ trait InsertIntoSchemaEvolutionTests { this: 
InsertIntoTests =>
       byName: Boolean = false,
       replaceWhere: Option[String] = None): Unit
 
+  /** Insert data into a table by name without schema evolution. */
+  protected def doInsertByName(
+      tableName: String,
+      insert: DataFrame,
+      mode: SaveMode = SaveMode.Append): Unit = {
+    val tmpView = "tmp_view"
+    withTempView(tmpView) {
+      insert.createOrReplaceTempView(tmpView)
+      val overwrite = if (mode == SaveMode.Overwrite) "OVERWRITE" else "INTO"
+      sql(s"INSERT $overwrite TABLE $tableName BY NAME SELECT * FROM $tmpView")
+    }
+  }
+
+  /** Run a block with INSERT nested type coercion enabled. */
+  protected def withInsertNestedTypeCoercion(f: => Unit): Unit = {
+    withSQLConf(SQLConf.INSERT_INTO_NESTED_TYPE_COERCION_ENABLED.key -> 
"true") {
+      f
+    }
+  }
+
   test("Insert schema evolution: extra column - no auto-schema-evolution 
capability") {
     val t1 = s"${catalogAndNamespace}tbl"
     withTable(t1) {
@@ -1403,4 +1423,722 @@ trait InsertIntoSchemaEvolutionTests { this: 
InsertIntoTests =>
       assert(spark.table(t1).schema("id").dataType === IntegerType)
     }
   }
+
+  // 
---------------------------------------------------------------------------
+  // Tests for source with fewer columns/fields than target
+  // 
---------------------------------------------------------------------------
+
+  test("Insert schema evolution: source missing top-level column by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val schema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("salary", IntegerType),
+        StructField("dep", StringType)))
+      val data = Seq(Row(0, 100, "sales"))
+      sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+      doInsert(t1, spark.createDataFrame(spark.sparkContext.parallelize(data), 
schema))
+      doInsertWithSchemaEvolution(t1,
+        Seq((1, "engineering")).toDF("id", "dep"),
+        byName = true)
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, 100, "sales"), Row(1, null, "engineering")))
+    }
+  }
+
+  test("Insert schema evolution: source missing top-level column by position") 
{
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+      doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+      // By position: source col 1 maps to target col 1, source col 2 maps to 
target col 2,
+      // trailing target col 3 is filled with null.
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1,
+          Seq((1, 200)).toDF("id", "salary"))
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, 100, "sales"), Row(1, 200, null)))
+    }
+  }
+
+  test("Insert schema evolution: source missing top-level column with DEFAULT 
by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, salary int DEFAULT 200, dep string) 
USING $v2Format")
+      doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+      doInsertWithSchemaEvolution(t1,
+        Seq((1, "engineering")).toDF("id", "dep"),
+        byName = true)
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, 100, "sales"), Row(1, 200, "engineering")))
+    }
+  }
+
+  test("Insert schema evolution: source missing top-level column with DEFAULT 
by position") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, salary int, dep string DEFAULT 
'unknown') USING $v2Format")
+      doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1,
+          Seq((1, 200)).toDF("id", "salary"))
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, 100, "sales"), Row(1, 200, "unknown")))
+    }
+  }
+
+  test("Insert schema evolution: source missing nested struct field by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType))))))
+      sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>) 
USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), 
targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), 
sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null))))
+    }
+  }
+
+  test("Insert schema evolution: source missing nested struct field by 
position") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType))))))
+      sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>) 
USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), 
targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), 
sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null))))
+    }
+  }
+
+  test("Insert schema evolution: source missing field in struct nested in 
array by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("a", ArrayType(StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType)))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"a array<struct<c1:int,c2:string,c3:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Seq(Row(1, "a", true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("a", ArrayType(StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType)))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Seq(Row(10, "b"))))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Seq(Row(1, "a", true))), Row(1, Seq(Row(10, "b", null)))))
+    }
+  }
+
+  test("Insert schema evolution: source missing field in struct nested in 
array by position") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("a", ArrayType(StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType)))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"a array<struct<c1:int,c2:string,c3:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Seq(Row(1, "a", true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("a", ArrayType(StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType)))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Seq(Row(10, "b"))))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Seq(Row(1, "a", true))), Row(1, Seq(Row(10, "b", null)))))
+    }
+  }
+
+  test("Insert schema evolution: source missing deeply nested struct field by 
name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StructType(Seq(
+            StructField("a", IntegerType),
+            StructField("b", BooleanType)))))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"s struct<c1:int,c2:struct<a:int,b:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, Row(10, true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StructType(Seq(
+            StructField("a", IntegerType)))))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(20, Row(30))))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, Row(10, true))), Row(1, Row(20, Row(30, null)))))
+    }
+  }
+
+  test("Insert schema evolution: source with null struct and missing nested 
field by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", IntegerType))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"s struct<c1:int,c2:string,c3:int>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", 10)))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, null))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, "a", 10)), Row(1, null)))
+    }
+  }
+
+  test("Insert schema evolution: source with null struct and missing nested 
field by position") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", IntegerType))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"s struct<c1:int,c2:string,c3:int>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", 10)))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, null))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, "a", 10)), Row(1, null)))
+    }
+  }
+
+  test("Insert schema evolution: mixed null and non-null structs with missing 
field by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType))))))
+      sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>) 
USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")), Row(2, 
null))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null)), Row(2, 
null)))
+    }
+  }
+
+  test("Insert schema evolution: null deeply nested struct with missing field 
by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StructType(Seq(
+            StructField("a", IntegerType),
+            StructField("b", BooleanType)))))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"s struct<c1:int,c2:struct<a:int,b:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, Row(10, true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StructType(Seq(
+            StructField("a", IntegerType)))))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(20, null)))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, Row(10, true))), Row(1, Row(20, null))))
+    }
+  }
+
+  test("Insert schema evolution: null struct in array with missing field by 
name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("a", ArrayType(StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", BooleanType)))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"a array<struct<c1:int,c2:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Seq(Row(1, true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("a", ArrayType(StructType(Seq(
+          StructField("c1", IntegerType)))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Seq(Row(10), null)))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Seq(Row(1, true))), Row(1, Seq(Row(10, null), null))))
+    }
+  }
+
+  test("Insert schema evolution: source missing field in struct nested in map 
value by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("m", MapType(StringType, StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", BooleanType)))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"m map<string, struct<c1:int,c2:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Map("x" -> Row(1, true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("m", MapType(StringType, StructType(Seq(
+          StructField("c1", IntegerType)))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Map("y" -> Row(10))))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(
+          Row(0, Map("x" -> Row(1, true))),
+          Row(1, Map("y" -> Row(10, null)))))
+    }
+  }
+
+  test("Insert schema evolution: source missing field in struct nested in map 
value by position") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("m", MapType(StringType, StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", BooleanType)))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"m map<string, struct<c1:int,c2:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Map("x" -> Row(1, true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("m", MapType(StringType, StructType(Seq(
+          StructField("c1", IntegerType)))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Map("y" -> Row(10))))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(
+          Row(0, Map("x" -> Row(1, true))),
+          Row(1, Map("y" -> Row(10, null)))))
+    }
+  }
+
+  test("Insert schema evolution: extra and missing top-level column by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+      doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+      // Source has "active" (extra) but is missing "salary". Column count is 
the same (3)
+      // but names differ; by-name resolution should add "active" via schema 
evolution
+      // and fill "salary" with null.
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1,
+          Seq((1, "engineering", true)).toDF("id", "dep", "active"),
+          byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT id, salary, dep, active FROM $t1"),
+        Seq(Row(0, 100, "sales", null), Row(1, null, "engineering", true)))
+    }
+  }
+
+  test("Insert schema evolution: extra and missing nested struct field by 
name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType))))))
+      sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>) 
USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), 
targetSchema)
+      doInsert(t1, targetData)
+
+      // Source struct has "c1", "c2", "c4" (extra) but is missing "c3". Field 
count is the same
+      // (3) but names differ; by-name resolution should add "c4" via schema 
evolution and fill
+      // "c3" with null.
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c4", DoubleType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b", 3.14)))), 
sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT id, s.c1, s.c2, s.c3, s.c4 FROM $t1"),
+        Seq(Row(0, 1, "a", true, null), Row(1, 10, "b", null, 3.14)))
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Negative tests: missing columns/fields should fail WITHOUT schema 
evolution
+  // 
---------------------------------------------------------------------------
+
+  test("Insert without evolution: source missing top-level column by name 
fails") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+      doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+      // Without explicit DEFAULT on `salary`, missing by-name data only 
errors when null-fill
+      // for missing defaults is disabled; otherwise FILL mode inserts null 
for `salary`.
+      withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> 
"false") {
+        checkError(
+          exception = intercept[AnalysisException] {
+            doInsertByName(t1, Seq((1, "engineering")).toDF("id", "dep"))
+          },
+          condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+          parameters = Map(
+            "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+            "colName" -> "`salary`")
+        )
+      }
+    }
+  }
+
+  test("Insert schema evolution: source missing top-level column by position 
fails " +
+      "when null default disabled and column has no explicit DEFAULT") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+      doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+      withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> 
"false") {
+        withInsertNestedTypeCoercion {
+          checkError(
+            exception = intercept[AnalysisException] {
+              doInsertWithSchemaEvolution(t1,
+                Seq((1, 200)).toDF("id", "salary"))
+            },
+            condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+            parameters = Map(
+              "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+              "colName" -> "`dep`")
+          )
+        }
+      }
+    }
+  }
+
+  test("Insert schema evolution: source missing nested struct field by 
position fails " +
+      "when null default disabled") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType))))))
+      sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>) 
USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), 
targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), 
sourceSchema)
+      withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> 
"false") {
+        withInsertNestedTypeCoercion {
+          checkError(
+            exception = intercept[AnalysisException] {
+              doInsertWithSchemaEvolution(t1, sourceData)
+            },
+            condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+            parameters = Map(
+              "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+              "colName" -> "`s`.`c3`")
+          )
+        }
+      }
+    }
+  }
+
+  test("Insert without evolution: source missing top-level column by position 
fails") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+      doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          doInsert(t1, Seq((1, 200)).toDF("id", "salary"))
+        },
+        condition = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
+        parameters = Map(
+          "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+          "tableColumns" -> "`id`, `salary`, `dep`",
+          "dataColumns" -> "`id`, `salary`")
+      )
+    }
+  }
+
+  test("Insert without evolution: source missing nested struct field by name 
fails") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>) 
USING $v2Format")
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType))))))
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), 
targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), 
sourceSchema)
+      checkError(
+        exception = intercept[AnalysisException] {
+          doInsertByName(t1, sourceData)
+        },
+        condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+        parameters = Map(
+          "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+          "colName" -> "`s`.`c3`")
+      )
+    }
+  }
+
+  test("Insert with evolution but without coercion flag:" +
+      " source missing nested struct field by name fails") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>) 
USING $v2Format")
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType))))))
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), 
targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), 
sourceSchema)
+      checkError(
+        exception = intercept[AnalysisException] {
+          doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+        },
+        condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+        parameters = Map(
+          "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+          "colName" -> "`s`.`c3`")
+      )
+    }
+  }
+
+  test("Insert without evolution: source missing nested struct field by 
position fails") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType))))))
+      sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>) 
USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), 
targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), 
sourceSchema)
+      checkError(
+        exception = intercept[AnalysisException] {
+          doInsert(t1, sourceData)
+        },
+        condition = "INCOMPATIBLE_DATA_FOR_TABLE.STRUCT_MISSING_FIELDS",
+        parameters = Map(
+          "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+          "colName" -> "`s`",
+          "missingFields" -> "`c3`")
+      )
+    }
+  }
 }
diff --git 
a/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions
 
b/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions
index 2aa6cb885ca3..36fda2b50688 100644
--- 
a/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions
+++ 
b/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions
@@ -659,6 +659,7 @@ spark.sql.inMemoryColumnarStorage.hugeVectorReserveRatio
 spark.sql.inMemoryColumnarStorage.hugeVectorThreshold
 spark.sql.inMemoryColumnarStorage.partitionPruning
 spark.sql.inMemoryTableScanStatistics.enable
+spark.sql.insertNestedTypeCoercion.enabled
 spark.sql.join.preferSortMergeJoin
 spark.sql.json.enableExactStringParsing
 spark.sql.json.enablePartialResults


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to