This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.2 by this push:
new 9e27d319175d [SPARK-56550][SQL][4.2] Support source with fewer
columns/fields in INSERT INTO WITH SCHEMA EVOLUTION
9e27d319175d is described below
commit 9e27d319175dd6ac09ea0e2b578ab3fe1967fb81
Author: Szehon Ho <[email protected]>
AuthorDate: Wed May 20 21:07:14 2026 +0800
[SPARK-56550][SQL][4.2] Support source with fewer columns/fields in INSERT
INTO WITH SCHEMA EVOLUTION
## Summary
Backport of #55427 to `branch-4.2`.
Adds support for `INSERT INTO ... WITH SCHEMA EVOLUTION` to fill missing
nested struct fields with null (or column defaults) when the source has fewer
fields than the target table, mirroring existing `MERGE INTO` behavior gated by
`spark.sql.mergeNestedTypeCoercion.enabled`.
Key changes:
- New config: `spark.sql.insertNestedTypeCoercion.enabled` (internal,
default `false`)
- Refactor `TableOutputResolver.resolveOutputColumns` to use
`DefaultValueFillMode` (`NONE`, `FILL`, `RECURSE`)
- Enable `RECURSE` mode for V2 inserts when schema evolution and the
coercion flag are both enabled
- 17 new tests in `InsertIntoSchemaEvolutionTests` (via
`InsertIntoTests.scala`)
## Why are the changes needed?
`MERGE INTO` already supports nested type coercion when the source has
fewer struct fields than the target. `INSERT INTO WITH SCHEMA EVOLUTION` lacked
this capability, causing errors for legitimate schema-evolution workflows where
older sources omit newer nested fields.
## Does this PR introduce _any_ user-facing change?
Yes. When `spark.sql.insertNestedTypeCoercion.enabled` is set to `true`
(default `false`), `INSERT INTO ... WITH SCHEMA EVOLUTION` fills missing nested
struct fields with null instead of failing.
## How was this patch tested?
Cherry-picked from #55427 onto current `branch-4.2` (`bd8872a0cc7`) with
clean cherry-picks (no conflicts).
Original PR test plan:
- Added comprehensive positive/negative tests in `InsertIntoTests.scala`
- All matched tests passed on master
## Was this patch authored or co-authored using generative AI tooling?
Generated-by: Cursor (Claude Opus 4)
Closes #56002 from szehon-ho/insert-schema-evolution-missing-fields-4.2.
Authored-by: Szehon Ho <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/catalyst/analysis/Analyzer.scala | 10 +-
.../catalyst/analysis/TableOutputResolver.scala | 124 +++-
.../org/apache/spark/sql/internal/SQLConf.scala | 15 +
.../spark/sql/execution/datasources/rules.scala | 2 +-
.../spark/sql/connector/InsertIntoTests.scala | 738 +++++++++++++++++++++
.../configs-without-binding-policy-exceptions | 1 +
6 files changed, 853 insertions(+), 37 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 09c6a0984258..faa78e030636 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -29,6 +29,7 @@ import org.apache.spark.{SparkException,
SparkUnsupportedOperationException}
import org.apache.spark.internal.config.ConfigBindingPolicy
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst._
+import
org.apache.spark.sql.catalyst.analysis.TableOutputResolver.DefaultValueFillMode._
import org.apache.spark.sql.catalyst.analysis.resolver.{
AnalyzerBridgeState,
HybridAnalyzer,
@@ -3788,9 +3789,16 @@ class Analyzer(
validateStoreAssignmentPolicy()
TableOutputResolver.suitableForByNameCheck(v2Write.isByName,
expected = v2Write.table.output, queryOutput = v2Write.query.output)
+ // With schema evolution + coercion flag, missing top-level columns
AND missing nested
+ // struct fields are filled with defaults/null (RECURSE mode).
Otherwise, only missing
+ // top-level columns are filled via FILL mode; missing nested struct
fields still cause
+ // schema enforcement errors.
+ val defaultValueFillMode =
+ if (conf.coerceInsertNestedTypes && v2Write.schemaEvolutionEnabled)
RECURSE
+ else FILL
val projection = TableOutputResolver.resolveOutputColumns(
v2Write.table.name, v2Write.table.output, v2Write.query,
v2Write.isByName, conf,
- supportColDefaultValue = true)
+ defaultValueFillMode)
if (projection != v2Write.query) {
val cleanedTable = v2Write.table match {
case r: DataSourceV2Relation =>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
index 7eacc5ab9b2a..d691c449733f 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
@@ -41,9 +41,10 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
/**
* Modes for filling in default or null values for missing columns.
- * If FILL, fill missing top-level columns with their default values.
- * If RECURSE, fill missing top-level columns and also recurse into nested
struct
- * fields to fill null.
+ * If FILL, fill missing top-level columns with their default values
(by-name reorder path).
+ * If RECURSE, fill missing top-level columns (including trailing columns on
the by-position
+ * path for INSERT with schema evolution when enabled) and recurse into
nested structs,
+ * arrays, and maps to fill missing struct fields with null or defaults.
* If NONE, do not fill any missing columns.
*/
object DefaultValueFillMode extends Enumeration {
@@ -92,19 +93,22 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
query: LogicalPlan,
byName: Boolean,
conf: SQLConf,
- supportColDefaultValue: Boolean = false): LogicalPlan = {
+ defaultValueFillMode: DefaultValueFillMode.Value = NONE): LogicalPlan = {
if (expected.size < query.output.size) {
throw QueryCompilationErrors.cannotWriteTooManyColumnsToTableError(
tableName, expected.map(_.name), query.output)
}
+ // In RECURSE mode, allow fewer source columns than target by filling
trailing columns
+ // with defaults. In other modes, a column count mismatch in by-position
resolution is
+ // an error.
+ val fillDefaultValue = defaultValueFillMode == RECURSE
val errors = new mutable.ArrayBuffer[String]()
val resolved: Seq[NamedExpression] = if (byName) {
- // If a top-level column does not have a corresponding value in the
input query, fill with
- // the column's default value. We need to pass `fillDefaultValue` as
FILL here, if the
- // `supportColDefaultValue` parameter is also true.
- val defaultValueFillMode = if (supportColDefaultValue) FILL else NONE
+ // By-name resolution: the defaultValueFillMode is passed through to
control whether
+ // missing top-level columns are filled (FILL/RECURSE) and whether
missing nested
+ // struct fields are also filled (RECURSE only).
reorderColumnsByName(
tableName,
query.output,
@@ -112,13 +116,15 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
conf,
errors += _,
Nil,
- defaultValueFillMode)
+ defaultValueFillMode,
+ enforceFullOutput = true)
} else {
- if (expected.size > query.output.size) {
+ if (expected.size > query.output.size && !fillDefaultValue) {
throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError(
tableName, expected.map(_.name), query.output)
}
- resolveColumnsByPosition(tableName, query.output, expected, conf, errors
+= _)
+ resolveColumnsByPosition(
+ tableName, query.output, expected, conf, errors += _, fillDefaultValue
= fillDefaultValue)
}
if (errors.nonEmpty) {
@@ -157,17 +163,17 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
case (valueType: StructType, colType: StructType) =>
val resolvedValue = resolveStructType(
tableName, value, valueType, col, colType,
- byName = true, conf, addError, colPath, fillChildDefaultValue)
+ byName = true, conf, addError, colPath, fillChildDefaultValue,
enforceFullOutput = false)
resolvedValue.getOrElse(value)
case (valueType: ArrayType, colType: ArrayType) =>
val resolvedValue = resolveArrayType(
tableName, value, valueType, col, colType,
- byName = true, conf, addError, colPath, fillChildDefaultValue)
+ byName = true, conf, addError, colPath, fillChildDefaultValue,
enforceFullOutput = false)
resolvedValue.getOrElse(value)
case (valueType: MapType, colType: MapType) =>
val resolvedValue = resolveMapType(
tableName, value, valueType, col, colType,
- byName = true, conf, addError, colPath, fillChildDefaultValue)
+ byName = true, conf, addError, colPath, fillChildDefaultValue,
enforceFullOutput = false)
resolvedValue.getOrElse(value)
case _ =>
checkUpdate(tableName, value, col, conf, addError, colPath)
@@ -304,7 +310,8 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
conf: SQLConf,
addError: String => Unit,
colPath: Seq[String] = Nil,
- defaultValueFillMode: DefaultValueFillMode.Value): Seq[NamedExpression]
= {
+ defaultValueFillMode: DefaultValueFillMode.Value,
+ enforceFullOutput: Boolean = false): Seq[NamedExpression] = {
val matchedCols = mutable.HashSet.empty[String]
val reordered = expectedCols.flatMap { expectedCol =>
val matched = inputCols.filter(col => conf.resolver(col.name,
expectedCol.name))
@@ -336,15 +343,15 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
case (matchedType: StructType, expectedType: StructType) =>
resolveStructType(
tableName, matchedCol, matchedType, actualExpectedCol,
expectedType,
- byName = true, conf, addError, newColPath, childFillDefaultValue)
+ byName = true, conf, addError, newColPath,
childFillDefaultValue, enforceFullOutput)
case (matchedType: ArrayType, expectedType: ArrayType) =>
resolveArrayType(
tableName, matchedCol, matchedType, actualExpectedCol,
expectedType,
- byName = true, conf, addError, newColPath, childFillDefaultValue)
+ byName = true, conf, addError, newColPath,
childFillDefaultValue, enforceFullOutput)
case (matchedType: MapType, expectedType: MapType) =>
resolveMapType(
tableName, matchedCol, matchedType, actualExpectedCol,
expectedType,
- byName = true, conf, addError, newColPath, childFillDefaultValue)
+ byName = true, conf, addError, newColPath,
childFillDefaultValue, enforceFullOutput)
case _ =>
checkField(
tableName, actualExpectedCol, matchedCol, byName = true, conf,
addError, newColPath)
@@ -366,6 +373,11 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
} else {
reordered
}
+ } else if (enforceFullOutput) {
+ val colName =
+ if (colPath.nonEmpty) colPath.quoted
+ else expectedCols.map(_.name).map(toSQLId).mkString(", ")
+ throw
QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName,
colName)
} else {
Nil
}
@@ -377,7 +389,8 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
expectedCols: Seq[Attribute],
conf: SQLConf,
addError: String => Unit,
- colPath: Seq[String] = Nil): Seq[NamedExpression] = {
+ colPath: Seq[String] = Nil,
+ fillDefaultValue: Boolean = false): Seq[NamedExpression] = {
val actualExpectedCols = expectedCols.map { attr =>
attr.withDataType {
CharVarcharUtils.getRawType(attr.metadata).getOrElse(attr.dataType) }
}
@@ -393,7 +406,7 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
tableName, colPath.quoted, extraColsStr
)
}
- } else if (inputCols.size < actualExpectedCols.size) {
+ } else if (inputCols.size < actualExpectedCols.size && !fillDefaultValue) {
val missingColsStr =
actualExpectedCols.takeRight(actualExpectedCols.size - inputCols.size)
.map(col => toSQLId(col.name))
.mkString(", ")
@@ -407,25 +420,48 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
}
}
- inputCols.zip(actualExpectedCols).flatMap { case (inputCol, expectedCol) =>
+ val matched = inputCols.zip(actualExpectedCols).flatMap { case (inputCol,
expectedCol) =>
val newColPath = colPath :+ expectedCol.name
(inputCol.dataType, expectedCol.dataType) match {
case (inputType: StructType, expectedType: StructType) =>
resolveStructType(
tableName, inputCol, inputType, expectedCol, expectedType,
- byName = false, conf, addError, newColPath, fillDefaultValue =
false)
+ byName = false, conf, addError, newColPath, fillDefaultValue,
enforceFullOutput = true)
case (inputType: ArrayType, expectedType: ArrayType) =>
resolveArrayType(
tableName, inputCol, inputType, expectedCol, expectedType,
- byName = false, conf, addError, newColPath, fillDefaultValue =
false)
+ byName = false, conf, addError, newColPath, fillDefaultValue,
enforceFullOutput = true)
case (inputType: MapType, expectedType: MapType) =>
resolveMapType(
tableName, inputCol, inputType, expectedCol, expectedType,
- byName = false, conf, addError, newColPath, fillDefaultValue =
false)
+ byName = false, conf, addError, newColPath, fillDefaultValue,
enforceFullOutput = true)
case _ =>
checkField(tableName, expectedCol, inputCol, byName = false, conf,
addError, newColPath)
}
}
+
+ val defaults = if (fillDefaultValue) {
+ actualExpectedCols.drop(inputCols.size).map { expectedCol =>
+ val defaultExpr = getDefaultValueExprOrNullLit(
+ expectedCol, conf.useNullsForMissingDefaultColumnValues)
+ if (defaultExpr.isEmpty) {
+ throw
QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(
+ tableName, (colPath :+ expectedCol.name).quoted)
+ }
+ applyColumnMetadata(defaultExpr.get, expectedCol)
+ }
+ } else {
+ Nil
+ }
+
+ val result = matched ++ defaults
+ if (result.length != actualExpectedCols.size) {
+ val colName =
+ if (colPath.nonEmpty) colPath.quoted
+ else actualExpectedCols.map(_.name).map(toSQLId).mkString(", ")
+ throw
QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName,
colName)
+ }
+ result
}
private[sql] def checkNullability(
@@ -447,6 +483,7 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
input.nullable && !attr.nullable && conf.storeAssignmentPolicy !=
StoreAssignmentPolicy.LEGACY
}
+ // scalastyle:off argcount
private def resolveStructType(
tableName: String,
input: Expression,
@@ -457,7 +494,8 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
conf: SQLConf,
addError: String => Unit,
colPath: Seq[String],
- fillDefaultValue: Boolean): Option[NamedExpression] = {
+ fillDefaultValue: Boolean,
+ enforceFullOutput: Boolean): Option[NamedExpression] = {
val nullCheckedInput = checkNullability(input, expected, conf, colPath)
val fields = inputType.zipWithIndex.map { case (f, i) =>
Alias(GetStructField(nullCheckedInput, i, Some(f.name)), f.name)()
@@ -465,10 +503,10 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
val defaultValueMode = if (fillDefaultValue) RECURSE else NONE
val resolved = if (byName) {
reorderColumnsByName(tableName, fields, toAttributes(expectedType),
conf, addError, colPath,
- defaultValueMode)
+ defaultValueMode, enforceFullOutput)
} else {
resolveColumnsByPosition(
- tableName, fields, toAttributes(expectedType), conf, addError, colPath)
+ tableName, fields, toAttributes(expectedType), conf, addError,
colPath, fillDefaultValue)
}
if (resolved.length == expectedType.length) {
val struct = CreateStruct(resolved)
@@ -478,6 +516,11 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
struct
}
Some(applyColumnMetadata(res, expected))
+ } else if (enforceFullOutput) {
+ val colName =
+ if (colPath.nonEmpty) colPath.quoted
+ else expectedType.fields.map(_.name).map(toSQLId).mkString(", ")
+ throw
QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName,
colName)
} else {
None
}
@@ -493,7 +536,8 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
conf: SQLConf,
addError: String => Unit,
colPath: Seq[String],
- fillDefaultValue: Boolean): Option[NamedExpression] = {
+ fillDefaultValue: Boolean,
+ enforceFullOutput: Boolean): Option[NamedExpression] = {
val nullCheckedInput = checkNullability(input, expected, conf, colPath)
val param = NamedLambdaVariable("element", inputType.elementType,
inputType.containsNull)
val fakeAttr =
@@ -501,9 +545,10 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
val res = if (byName) {
val defaultValueMode = if (fillDefaultValue) RECURSE else NONE
reorderColumnsByName(tableName, Seq(param), Seq(fakeAttr), conf,
addError, colPath,
- defaultValueMode)
+ defaultValueMode, enforceFullOutput)
} else {
- resolveColumnsByPosition(tableName, Seq(param), Seq(fakeAttr), conf,
addError, colPath)
+ resolveColumnsByPosition(
+ tableName, Seq(param), Seq(fakeAttr), conf, addError, colPath,
fillDefaultValue)
}
if (res.length == 1) {
val castedArray =
@@ -515,6 +560,9 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
ArrayTransform(nullCheckedInput, func)
}
Some(applyColumnMetadata(castedArray, expected))
+ } else if (enforceFullOutput) {
+ val colName = if (colPath.nonEmpty) colPath.quoted else
toSQLId(expected.name)
+ throw
QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName,
colName)
} else {
None
}
@@ -530,7 +578,8 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
conf: SQLConf,
addError: String => Unit,
colPath: Seq[String],
- fillDefaultValue: Boolean): Option[NamedExpression] = {
+ fillDefaultValue: Boolean,
+ enforceFullOutput: Boolean): Option[NamedExpression] = {
val nullCheckedInput = checkNullability(input, expected, conf, colPath)
val keyParam = NamedLambdaVariable("key", inputType.keyType, nullable =
false)
@@ -538,9 +587,10 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
val defaultValueFillMode = if (fillDefaultValue) RECURSE else NONE
val resKey = if (byName) {
reorderColumnsByName(tableName, Seq(keyParam), Seq(fakeKeyAttr), conf,
addError, colPath,
- defaultValueFillMode)
+ defaultValueFillMode, enforceFullOutput)
} else {
- resolveColumnsByPosition(tableName, Seq(keyParam), Seq(fakeKeyAttr),
conf, addError, colPath)
+ resolveColumnsByPosition(
+ tableName, Seq(keyParam), Seq(fakeKeyAttr), conf, addError, colPath,
fillDefaultValue)
}
val valueParam =
@@ -549,10 +599,10 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
AttributeReference("value", expectedType.valueType,
expectedType.valueContainsNull)()
val resValue = if (byName) {
reorderColumnsByName(tableName, Seq(valueParam), Seq(fakeValueAttr),
conf, addError, colPath,
- defaultValueFillMode)
+ defaultValueFillMode, enforceFullOutput)
} else {
resolveColumnsByPosition(
- tableName, Seq(valueParam), Seq(fakeValueAttr), conf, addError,
colPath)
+ tableName, Seq(valueParam), Seq(fakeValueAttr), conf, addError,
colPath, fillDefaultValue)
}
if (resKey.length == 1 && resValue.length == 1) {
@@ -577,10 +627,14 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
MapFromArrays(newKeys, newValues)
}
Some(applyColumnMetadata(casted, expected))
+ } else if (enforceFullOutput) {
+ val colName = if (colPath.nonEmpty) colPath.quoted else
toSQLId(expected.name)
+ throw
QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(tableName,
colName)
} else {
None
}
}
+ // scalastyle:on argcount
// For table insertions, capture the overflow errors and show proper message.
// Without this method, the overflow errors of castings will show hints for
turning off ANSI SQL
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index fcb736e40485..29a734f05598 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -7270,6 +7270,18 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val INSERT_INTO_NESTED_TYPE_COERCION_ENABLED =
+ buildConf("spark.sql.insertNestedTypeCoercion.enabled")
+ .internal()
+ .doc("If enabled, allow INSERT INTO WITH SCHEMA EVOLUTION to fill
missing nested " +
+ "struct fields with null when the source has fewer nested fields than
the target " +
+ "table. Also relaxes by-position column-count enforcement so trailing
missing " +
+ "top-level columns are filled with their default value (or null). This
is " +
+ "experimental and the semantics may change.")
+ .version("4.2.0")
+ .booleanConf
+ .createWithDefault(false)
+
val TIME_TYPE_ENABLED =
buildConf("spark.sql.timeType.enabled")
.internal()
@@ -8597,6 +8609,9 @@ class SQLConf extends Serializable with Logging with
SqlApiConf {
def coerceMergeNestedTypes: Boolean =
getConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED)
+ def coerceInsertNestedTypes: Boolean =
+ getConf(SQLConf.INSERT_INTO_NESTED_TYPE_COERCION_ENABLED)
+
def isTimeTypeEnabled: Boolean = getConf(SQLConf.TIME_TYPE_ENABLED)
def listaggAllowDistinctCastWithOrder: Boolean =
getConf(LISTAGG_ALLOW_DISTINCT_CAST_WITH_ORDER)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index ce41bbe4aeb3..7122dd52ef1a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -528,7 +528,7 @@ object PreprocessTableInsertion extends
ResolveInsertionBase {
query,
byName,
conf,
- supportColDefaultValue = true)
+ TableOutputResolver.DefaultValueFillMode.FILL)
} catch {
case e: AnalysisException if staticPartCols.nonEmpty &&
(e.getCondition ==
"INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS" ||
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
index 4f023136a6fe..42017c2dd60e 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
@@ -574,6 +574,26 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
byName: Boolean = false,
replaceWhere: Option[String] = None): Unit
+ /** Insert data into a table by name without schema evolution. */
+ protected def doInsertByName(
+ tableName: String,
+ insert: DataFrame,
+ mode: SaveMode = SaveMode.Append): Unit = {
+ val tmpView = "tmp_view"
+ withTempView(tmpView) {
+ insert.createOrReplaceTempView(tmpView)
+ val overwrite = if (mode == SaveMode.Overwrite) "OVERWRITE" else "INTO"
+ sql(s"INSERT $overwrite TABLE $tableName BY NAME SELECT * FROM $tmpView")
+ }
+ }
+
+ /** Run a block with INSERT nested type coercion enabled. */
+ protected def withInsertNestedTypeCoercion(f: => Unit): Unit = {
+ withSQLConf(SQLConf.INSERT_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
"true") {
+ f
+ }
+ }
+
test("Insert schema evolution: extra column - no auto-schema-evolution
capability") {
val t1 = s"${catalogAndNamespace}tbl"
withTable(t1) {
@@ -1403,4 +1423,722 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
assert(spark.table(t1).schema("id").dataType === IntegerType)
}
}
+
+ //
---------------------------------------------------------------------------
+ // Tests for source with fewer columns/fields than target
+ //
---------------------------------------------------------------------------
+
+ test("Insert schema evolution: source missing top-level column by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val schema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("salary", IntegerType),
+ StructField("dep", StringType)))
+ val data = Seq(Row(0, 100, "sales"))
+ sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+ doInsert(t1, spark.createDataFrame(spark.sparkContext.parallelize(data),
schema))
+ doInsertWithSchemaEvolution(t1,
+ Seq((1, "engineering")).toDF("id", "dep"),
+ byName = true)
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, 100, "sales"), Row(1, null, "engineering")))
+ }
+ }
+
+ test("Insert schema evolution: source missing top-level column by position")
{
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+ doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+ // By position: source col 1 maps to target col 1, source col 2 maps to
target col 2,
+ // trailing target col 3 is filled with null.
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1,
+ Seq((1, 200)).toDF("id", "salary"))
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, 100, "sales"), Row(1, 200, null)))
+ }
+ }
+
+ test("Insert schema evolution: source missing top-level column with DEFAULT
by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id int, salary int DEFAULT 200, dep string)
USING $v2Format")
+ doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+ doInsertWithSchemaEvolution(t1,
+ Seq((1, "engineering")).toDF("id", "dep"),
+ byName = true)
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, 100, "sales"), Row(1, 200, "engineering")))
+ }
+ }
+
+ test("Insert schema evolution: source missing top-level column with DEFAULT
by position") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id int, salary int, dep string DEFAULT
'unknown') USING $v2Format")
+ doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1,
+ Seq((1, 200)).toDF("id", "salary"))
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, 100, "sales"), Row(1, 200, "unknown")))
+ }
+ }
+
+ test("Insert schema evolution: source missing nested struct field by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType))))))
+ sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>)
USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))),
targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))),
sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null))))
+ }
+ }
+
+ test("Insert schema evolution: source missing nested struct field by
position") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType))))))
+ sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>)
USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))),
targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))),
sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null))))
+ }
+ }
+
+ test("Insert schema evolution: source missing field in struct nested in
array by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("a", ArrayType(StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType)))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"a array<struct<c1:int,c2:string,c3:boolean>>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Seq(Row(1, "a", true))))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("a", ArrayType(StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType)))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Seq(Row(10, "b"))))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Seq(Row(1, "a", true))), Row(1, Seq(Row(10, "b", null)))))
+ }
+ }
+
+ test("Insert schema evolution: source missing field in struct nested in
array by position") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("a", ArrayType(StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType)))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"a array<struct<c1:int,c2:string,c3:boolean>>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Seq(Row(1, "a", true))))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("a", ArrayType(StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType)))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Seq(Row(10, "b"))))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Seq(Row(1, "a", true))), Row(1, Seq(Row(10, "b", null)))))
+ }
+ }
+
+ test("Insert schema evolution: source missing deeply nested struct field by
name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType),
+ StructField("b", BooleanType)))))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"s struct<c1:int,c2:struct<a:int,b:boolean>>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, Row(10, true))))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType)))))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(20, Row(30))))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Row(1, Row(10, true))), Row(1, Row(20, Row(30, null)))))
+ }
+ }
+
+ test("Insert schema evolution: source with null struct and missing nested
field by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", IntegerType))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"s struct<c1:int,c2:string,c3:int>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", 10)))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, null))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Row(1, "a", 10)), Row(1, null)))
+ }
+ }
+
+ test("Insert schema evolution: source with null struct and missing nested
field by position") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", IntegerType))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"s struct<c1:int,c2:string,c3:int>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", 10)))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, null))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Row(1, "a", 10)), Row(1, null)))
+ }
+ }
+
+ test("Insert schema evolution: mixed null and non-null structs with missing
field by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType))))))
+ sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>)
USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")), Row(2,
null))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null)), Row(2,
null)))
+ }
+ }
+
+ test("Insert schema evolution: null deeply nested struct with missing field
by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType),
+ StructField("b", BooleanType)))))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"s struct<c1:int,c2:struct<a:int,b:boolean>>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, Row(10, true))))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType)))))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(20, null)))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Row(1, Row(10, true))), Row(1, Row(20, null))))
+ }
+ }
+
+ test("Insert schema evolution: null struct in array with missing field by
name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("a", ArrayType(StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", BooleanType)))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"a array<struct<c1:int,c2:boolean>>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Seq(Row(1, true))))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("a", ArrayType(StructType(Seq(
+ StructField("c1", IntegerType)))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Seq(Row(10), null)))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Seq(Row(1, true))), Row(1, Seq(Row(10, null), null))))
+ }
+ }
+
+ test("Insert schema evolution: source missing field in struct nested in map
value by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("m", MapType(StringType, StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", BooleanType)))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"m map<string, struct<c1:int,c2:boolean>>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Map("x" -> Row(1, true))))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("m", MapType(StringType, StructType(Seq(
+ StructField("c1", IntegerType)))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Map("y" -> Row(10))))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(
+ Row(0, Map("x" -> Row(1, true))),
+ Row(1, Map("y" -> Row(10, null)))))
+ }
+ }
+
+ test("Insert schema evolution: source missing field in struct nested in map
value by position") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("m", MapType(StringType, StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", BooleanType)))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"m map<string, struct<c1:int,c2:boolean>>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Map("x" -> Row(1, true))))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("m", MapType(StringType, StructType(Seq(
+ StructField("c1", IntegerType)))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Map("y" -> Row(10))))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(
+ Row(0, Map("x" -> Row(1, true))),
+ Row(1, Map("y" -> Row(10, null)))))
+ }
+ }
+
+ test("Insert schema evolution: extra and missing top-level column by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+ doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+ // Source has "active" (extra) but is missing "salary". Column count is
the same (3)
+ // but names differ; by-name resolution should add "active" via schema
evolution
+ // and fill "salary" with null.
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1,
+ Seq((1, "engineering", true)).toDF("id", "dep", "active"),
+ byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT id, salary, dep, active FROM $t1"),
+ Seq(Row(0, 100, "sales", null), Row(1, null, "engineering", true)))
+ }
+ }
+
+ test("Insert schema evolution: extra and missing nested struct field by
name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType))))))
+ sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>)
USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))),
targetSchema)
+ doInsert(t1, targetData)
+
+ // Source struct has "c1", "c2", "c4" (extra) but is missing "c3". Field
count is the same
+ // (3) but names differ; by-name resolution should add "c4" via schema
evolution and fill
+ // "c3" with null.
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c4", DoubleType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b", 3.14)))),
sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT id, s.c1, s.c2, s.c3, s.c4 FROM $t1"),
+ Seq(Row(0, 1, "a", true, null), Row(1, 10, "b", null, 3.14)))
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // Negative tests: missing columns/fields should fail WITHOUT schema
evolution
+ //
---------------------------------------------------------------------------
+
+ test("Insert without evolution: source missing top-level column by name
fails") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+ doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+ // Without explicit DEFAULT on `salary`, missing by-name data only
errors when null-fill
+ // for missing defaults is disabled; otherwise FILL mode inserts null
for `salary`.
+ withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key ->
"false") {
+ checkError(
+ exception = intercept[AnalysisException] {
+ doInsertByName(t1, Seq((1, "engineering")).toDF("id", "dep"))
+ },
+ condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+ parameters = Map(
+ "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+ "colName" -> "`salary`")
+ )
+ }
+ }
+ }
+
+ test("Insert schema evolution: source missing top-level column by position
fails " +
+ "when null default disabled and column has no explicit DEFAULT") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+ doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+ withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key ->
"false") {
+ withInsertNestedTypeCoercion {
+ checkError(
+ exception = intercept[AnalysisException] {
+ doInsertWithSchemaEvolution(t1,
+ Seq((1, 200)).toDF("id", "salary"))
+ },
+ condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+ parameters = Map(
+ "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+ "colName" -> "`dep`")
+ )
+ }
+ }
+ }
+ }
+
+ test("Insert schema evolution: source missing nested struct field by
position fails " +
+ "when null default disabled") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType))))))
+ sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>)
USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))),
targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))),
sourceSchema)
+ withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key ->
"false") {
+ withInsertNestedTypeCoercion {
+ checkError(
+ exception = intercept[AnalysisException] {
+ doInsertWithSchemaEvolution(t1, sourceData)
+ },
+ condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+ parameters = Map(
+ "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+ "colName" -> "`s`.`c3`")
+ )
+ }
+ }
+ }
+ }
+
+ test("Insert without evolution: source missing top-level column by position
fails") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+ doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+ checkError(
+ exception = intercept[AnalysisException] {
+ doInsert(t1, Seq((1, 200)).toDF("id", "salary"))
+ },
+ condition = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
+ parameters = Map(
+ "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+ "tableColumns" -> "`id`, `salary`, `dep`",
+ "dataColumns" -> "`id`, `salary`")
+ )
+ }
+ }
+
+ test("Insert without evolution: source missing nested struct field by name
fails") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>)
USING $v2Format")
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType))))))
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))),
targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))),
sourceSchema)
+ checkError(
+ exception = intercept[AnalysisException] {
+ doInsertByName(t1, sourceData)
+ },
+ condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+ parameters = Map(
+ "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+ "colName" -> "`s`.`c3`")
+ )
+ }
+ }
+
+ test("Insert with evolution but without coercion flag:" +
+ " source missing nested struct field by name fails") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>)
USING $v2Format")
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType))))))
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))),
targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))),
sourceSchema)
+ checkError(
+ exception = intercept[AnalysisException] {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ },
+ condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+ parameters = Map(
+ "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+ "colName" -> "`s`.`c3`")
+ )
+ }
+ }
+
+ test("Insert without evolution: source missing nested struct field by
position fails") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType))))))
+ sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>)
USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))),
targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))),
sourceSchema)
+ checkError(
+ exception = intercept[AnalysisException] {
+ doInsert(t1, sourceData)
+ },
+ condition = "INCOMPATIBLE_DATA_FOR_TABLE.STRUCT_MISSING_FIELDS",
+ parameters = Map(
+ "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+ "colName" -> "`s`",
+ "missingFields" -> "`c3`")
+ )
+ }
+ }
}
diff --git
a/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions
b/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions
index 2aa6cb885ca3..36fda2b50688 100644
---
a/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions
+++
b/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions
@@ -659,6 +659,7 @@ spark.sql.inMemoryColumnarStorage.hugeVectorReserveRatio
spark.sql.inMemoryColumnarStorage.hugeVectorThreshold
spark.sql.inMemoryColumnarStorage.partitionPruning
spark.sql.inMemoryTableScanStatistics.enable
+spark.sql.insertNestedTypeCoercion.enabled
spark.sql.join.preferSortMergeJoin
spark.sql.json.enableExactStringParsing
spark.sql.json.enablePartialResults
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]