This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 701d9e0d79 [spark] Unify Spark write column alignment via
PaimonOutputResolver (#7928)
701d9e0d79 is described below
commit 701d9e0d79b045f41ee4b3a7ce54a295d0c06e16
Author: Zouxxyy <[email protected]>
AuthorDate: Fri May 22 10:40:11 2026 +0800
[spark] Unify Spark write column alignment via PaimonOutputResolver (#7928)
Replace the ad-hoc column-alignment logic in `PaimonAnalysis` (a
parallel,
top-level-only implementation of `TableOutputResolver`) with a dedicated
`PaimonOutputResolver` — a trimmed fork of `TableOutputResolver` with
`mergeSchemaEnabled` support that applies uniformly at every nesting
depth.
- Missing target fields → NULL-fill at any depth
- Extra source fields → reject when `mergeSchemaEnabled=false`, else
kept
inline so `SchemaHelper.mergeSchema` evolves the table at write time
- Nullable downcast → `AssertNotNull`; type mismatch → tagged `Cast`
`paimonWriteResolved` now mirrors Spark's V2 `outputResolved` strictness
(name + type + nullability, including nested). A `PAIMON_WRITE_RESOLVED`
`TreeNodeTag` prevents re-entry when extras are kept inline.
---
.../spark/catalyst/analysis/PaimonAnalysis.scala | 361 ++-----------------
.../catalyst/analysis/PaimonOutputResolver.scala | 385 +++++++++++++++++++++
.../scala/org/apache/spark/sql/PaimonUtils.scala | 4 +
.../paimon/spark/SparkSchemaEvolutionITCase.java | 4 +-
.../org/apache/paimon/spark/sql/DDLTestBase.scala | 115 +++---
.../paimon/spark/sql/DataFrameWriteTestBase.scala | 270 ++++++++++++++-
.../spark/sql/InsertOverwriteTableTestBase.scala | 36 +-
.../paimon/spark/sql/V2WriteMergeSchemaTest.scala | 2 +-
.../paimon/spark/sql/WriteMergeSchemaTest.scala | 50 ++-
9 files changed, 804 insertions(+), 423 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
index 6009b6a807..d3ff947ce7 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
@@ -26,30 +26,39 @@ import
org.apache.paimon.spark.commands.{PaimonAnalyzeTableColumnCommand, Paimon
import org.apache.paimon.spark.util.OptionUtils
import org.apache.paimon.table.FileStoreTable
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{PaimonUtils, SparkSession}
import org.apache.spark.sql.catalyst.analysis.{NamedRelation, ResolvedTable}
-import org.apache.spark.sql.catalyst.expressions.{Alias, ArrayTransform,
Attribute, CreateStruct, Expression, GetArrayItem, GetStructField, If, IsNull,
LambdaFunction, Literal, NamedExpression, NamedLambdaVariable}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
-import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId
import org.apache.spark.sql.connector.catalog.TableCapability
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits,
DataSourceV2Relation}
-import org.apache.spark.sql.types._
-
-import scala.collection.mutable
class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
import DataSourceV2Implicits._
+ import PaimonAnalysis._
override def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveOperatorsDown {
- case a @ PaimonV2WriteCommand(table) if !paimonWriteResolved(a.query,
table, a.isByName) =>
+ case a @ PaimonV2WriteCommand(table)
+ if !paimonWriteResolved(a.query, table) &&
+ a.query.getTagValue(PAIMON_WRITE_RESOLVED).isEmpty =>
val mergeSchemaEnabled =
writeOptions(a).get(SparkConnectorOptions.MERGE_SCHEMA.key()).contains("true")
||
OptionUtils.writeMergeSchemaEnabled()
- resolveQueryColumns(a.query, table, a.isByName, mergeSchemaEnabled)
match {
- case Some(newQuery) if newQuery != a.query =>
Compatibility.withNewQuery(a, newQuery)
- case _ => a
+ val newQuery = PaimonOutputResolver.resolveOutputColumns(
+ table.name,
+ table.output,
+ a.query,
+ a.isByName,
+ conf,
+ mergeSchemaEnabled)
+ if (newQuery ne a.query) {
+ // Tag to short-circuit the next Analyzer pass; otherwise inline-kept
extras would loop.
+ newQuery.setTagValue(PAIMON_WRITE_RESOLVED, ())
+ Compatibility.withNewQuery(a, newQuery)
+ } else {
+ a
}
case o @ PaimonDynamicPartitionOverwrite(r, d) if o.resolved =>
@@ -77,337 +86,25 @@ class PaimonAnalysis(session: SparkSession) extends
Rule[LogicalPlan] {
}
}
- private def paimonWriteResolved(
- query: LogicalPlan,
- table: NamedRelation,
- isByName: Boolean): Boolean = {
+ // Mirrors Spark's V2WriteCommand `outputResolved` strictness: query and
table outputs must match
+ // by name, position, type (ignoring nullable compatibility), and
nullability. Any nested
+ // structural differences also have to be reconciled before we declare the
write resolved.
+ private def paimonWriteResolved(query: LogicalPlan, table: NamedRelation):
Boolean = {
query.output.size == table.output.size &&
query.output.zip(table.output).forall {
case (inAttr, outAttr) =>
val inType =
CharVarcharUtils.getRawType(inAttr.metadata).getOrElse(inAttr.dataType)
val outType =
CharVarcharUtils.getRawType(outAttr.metadata).getOrElse(outAttr.dataType)
- inAttr.name == outAttr.name && schemaCompatible(inType, outType,
isByName)
- }
- }
-
- private def resolveQueryColumns(
- query: LogicalPlan,
- table: NamedRelation,
- byName: Boolean,
- mergeSchemaEnabled: Boolean = false): Option[LogicalPlan] = {
- // More details see: `TableOutputResolver#resolveOutputColumns`
- if (byName) {
- try {
- Option.apply(resolveQueryColumnsByName(query, table))
- } catch {
- case e: Exception =>
- // Merge schema is effective only when using byName mode.
- // Schema validation is skipped here, because schema validation or
merging will be
- // done during insertion when mergeSchemaEnabled.
- if (mergeSchemaEnabled) {
- Option.empty
- } else {
- throw e
- }
- }
- } else {
- Option.apply(resolveQueryColumnsByPosition(query, table))
- }
- }
-
- private def resolveQueryColumnsByName(query: LogicalPlan, table:
NamedRelation): LogicalPlan = {
- val inputCols = query.output
- val expectedCols = table.output
- if (inputCols.size > expectedCols.size) {
- throw new RuntimeException(
- s"Cannot write incompatible data for the table `${table.name}`, " +
- "the number of data columns don't match with the table schema's.")
- }
-
- val matchedCols = mutable.HashSet.empty[String]
- val reorderedCols = expectedCols.map {
- expectedCol =>
- val matched = inputCols.filter(col => conf.resolver(col.name,
expectedCol.name))
- if (matched.isEmpty) {
- // TODO: Support Spark default value framework if Paimon supports to
change default values.
- if (!expectedCol.nullable) {
- throw new RuntimeException(
- s"Cannot write incompatible data for the table `${table.name}`,
" +
- s"due to non-nullable column `${expectedCol.name}` has no
specified value.")
- }
- Alias(Literal(null, expectedCol.dataType), expectedCol.name)()
- } else if (matched.length > 1) {
- throw new RuntimeException(
- s"Cannot write incompatible data for the table `${table.name}`,
due to column name conflicts: ${matched
- .mkString(", ")}.")
- } else {
- matchedCols += matched.head.name
- val matchedCol = matched.head
- addCastToColumn(matchedCol, expectedCol, isByName = true)
- }
- }
-
- assert(reorderedCols.length == expectedCols.length)
- if (matchedCols.size < inputCols.length) {
- val extraCols = inputCols
- .filterNot(col => matchedCols.contains(col.name))
- .map(col => s"${toSQLId(col.name)}")
- .mkString(", ")
- // There are seme unknown column names
- throw new RuntimeException(
- s"Cannot write incompatible data for the table `${table.name}`, due to
unknown column names: $extraCols.")
- }
- Project(reorderedCols, query)
- }
-
- private def resolveQueryColumnsByPosition(
- query: LogicalPlan,
- table: NamedRelation): LogicalPlan = {
- val expectedCols = table.output
- val queryCols = query.output
- if (queryCols.size != expectedCols.size) {
- throw new RuntimeException(
- s"Cannot write incompatible data for the table `${table.name}`, " +
- "the number of data columns don't match with the table schema's.")
- }
-
- val project = queryCols.zipWithIndex.map {
- case (attr, i) =>
- val targetAttr = expectedCols(i)
- addCastToColumn(attr, targetAttr, isByName = false)
- }
- Project(project, query)
- }
-
- private def schemaCompatible(
- dataSchema: DataType,
- tableSchema: DataType,
- checkFieldNames: Boolean): Boolean = {
- (dataSchema, tableSchema) match {
- case (s1: StructType, s2: StructType) =>
- s1.length == s2.length &&
- (!checkFieldNames ||
- (!hasResolverConflicts(s1) &&
- !hasResolverConflicts(s2) &&
- structFieldsResolved(s1, s2))) &&
- s1.zip(s2).forall {
- case (d1, d2) => schemaCompatible(d1.dataType, d2.dataType,
checkFieldNames)
- }
- case (a1: ArrayType, a2: ArrayType) =>
- // todo: support array type nullable evaluation
- schemaCompatible(a1.elementType, a2.elementType, checkFieldNames)
- case (m1: MapType, m2: MapType) =>
- m1.valueContainsNull == m2.valueContainsNull &&
- schemaCompatible(m1.keyType, m2.keyType, checkFieldNames) &&
- schemaCompatible(m1.valueType, m2.valueType, checkFieldNames)
- case (d1, d2) => d1 == d2
- }
- }
-
- private def structFieldsResolved(source: StructType, target: StructType):
Boolean = {
- source.zip(target).forall {
- case (sourceField, targetField) =>
- conf.resolver(sourceField.name, targetField.name)
- }
- }
-
- private def hasResolverConflicts(struct: StructType): Boolean = {
- struct.fields.combinations(2).exists { case Array(a, b) =>
conf.resolver(a.name, b.name) }
- }
-
- private def addCastToColumn(
- attr: Attribute,
- targetAttr: Attribute,
- isByName: Boolean): NamedExpression = {
- val expr = (attr.dataType, targetAttr.dataType) match {
- case (s, t) if s == t =>
- attr
- case (s: StructType, t: StructType) if s != t =>
- if (isByName) {
- addCastToStructByName(attr, s, t)
- } else {
- addCastToStructByPosition(attr, s, t)
- }
- case (ArrayType(s: StructType, sNull: Boolean), ArrayType(t: StructType,
_: Boolean))
- if s != t =>
- val castToStructFunc = if (isByName) {
- addCastToStructByName _
- } else {
- addCastToStructByPosition _
- }
- castToArrayStruct(attr, s, t, sNull, castToStructFunc)
- case _ =>
- cast(attr, targetAttr.dataType)
- }
- Alias(stringLengthCheck(expr, targetAttr.metadata),
targetAttr.name)(explicitMetadata =
- Option(targetAttr.metadata))
- }
-
- private def addCastToStructByName(
- parent: NamedExpression,
- source: StructType,
- target: StructType): NamedExpression = {
- // Reject target fields that collide under the current resolver
- // (e.g. `name` and `Name` with case-insensitive resolution), otherwise
- // we would silently map both target fields to the same source field.
- val targetConflicts = target.fields
- .combinations(2)
- .collect {
- case Array(a, b) if conf.resolver(a.name, b.name) => (a.name, b.name)
- }
- .toSeq
- if (targetConflicts.nonEmpty) {
- throw new RuntimeException(
- "Cannot write incompatible data: nested struct has conflicting target
field names: " +
- targetConflicts.map { case (a, b) => s"`$a` vs `$b`" }.mkString(",
") + ".")
- }
-
- // Single pass: resolve each target field to its source match(es) and track
- // which source indices were consumed, so we can detect extras without
- // rescanning source and target repeatedly.
- val sourceWithIndex = source.fields.zipWithIndex
- val consumed = mutable.BitSet.empty
- val resolved = target.fields.map {
- tgt =>
- val matches = sourceWithIndex.filter { case (f, _) =>
conf.resolver(f.name, tgt.name) }
- matches.foreach { case (_, i) => consumed += i }
- (tgt, matches)
- }
-
- // If source struct has fields not in target, reject so that merge-schema
- // can handle the evolution instead of silently dropping the extra fields.
- val extraFields = sourceWithIndex.collect {
- case (f, i) if !consumed(i) => f.name
- }
- if (extraFields.nonEmpty) {
- throw new RuntimeException(
- s"Cannot write incompatible data: nested struct has extra fields:
${extraFields.mkString(", ")}.")
- }
-
- val fields = resolved.map {
- case (targetField, matches) =>
- val (sourceIndex, sourceField) = resolveSingleSourceField(matches,
targetField.name, source)
- (targetField.dataType, sourceField.dataType) match {
- case (nested: StructType, s: StructType) =>
- val subField = extractStructField(parent, sourceIndex,
sourceField.name, targetField)
- addCastToStructByName(subField, s, nested)
- case (_: StructType, o) =>
- throw new RuntimeException(s"Can not support to cast $o to
StructType.")
- case _ =>
- castStructField(parent, sourceIndex, sourceField.name, targetField)
- }
- }
- structAlias(fields, parent)
- }
-
- private def resolveSingleSourceField(
- matches: Array[(StructField, Int)],
- name: String,
- source: StructType): (Int, StructField) = {
- if (matches.length == 1) {
- val (field, index) = matches(0)
- (index, field)
- } else if (matches.isEmpty) {
- throw new RuntimeException(
- s"""Field "$name" does not exist in source struct type:
${source.simpleString}.""")
- } else {
- throw new RuntimeException(
- s"""Cannot resolve nested field "$name" due to name conflicts: """ +
- matches.map(_._1.name).mkString(", ") + ".")
+ inAttr.name == outAttr.name &&
+ PaimonUtils.equalsIgnoreCompatibleNullability(inType, outType) &&
+ (outAttr.nullable || !inAttr.nullable)
}
}
- private def addCastToStructByPosition(
- parent: NamedExpression,
- source: StructType,
- target: StructType): NamedExpression = {
- if (source.length != target.length) {
- throw new RuntimeException("The number of fields in source and target is
not same.")
- }
-
- val fields = target.zipWithIndex.map {
- case (targetField @ StructField(_, nested: StructType, _, _), i) =>
- val sourceField = source(i)
- sourceField.dataType match {
- case s: StructType =>
- val subField = castStructField(parent, i, sourceField.name,
targetField)
- addCastToStructByPosition(subField, s, nested)
- case o =>
- throw new RuntimeException(s"Can not support to cast $o to
StructType.")
- }
- case (targetField, i) =>
- val sourceField = source(i)
- castStructField(parent, i, sourceField.name, targetField)
- }
- structAlias(fields, parent)
- }
-
- private def structAlias(
- fields: Seq[NamedExpression],
- parent: NamedExpression): NamedExpression = {
- val struct = CreateStruct(fields)
- val res = if (parent.nullable) {
- If(IsNull(parent), Literal(null, struct.dataType), struct)
- } else {
- struct
- }
- Alias(res, parent.name)(parent.exprId, parent.qualifier,
Option(parent.metadata))
- }
-
- private def castStructField(
- parent: NamedExpression,
- i: Int,
- sourceFieldName: String,
- targetField: StructField): NamedExpression = {
- Alias(
- stringLengthCheck(
- cast(GetStructField(parent, i, Option(sourceFieldName)),
targetField.dataType),
- targetField.metadata),
- targetField.name)(explicitMetadata = Option(targetField.metadata))
- }
-
- private def extractStructField(
- parent: NamedExpression,
- i: Int,
- sourceFieldName: String,
- targetField: StructField): NamedExpression = {
- Alias(GetStructField(parent, i, Option(sourceFieldName)),
targetField.name)(
- explicitMetadata = Option(targetField.metadata))
- }
-
- private def castToArrayStruct(
- parent: NamedExpression,
- source: StructType,
- target: StructType,
- sourceNullable: Boolean,
- castToStructFunc: (NamedExpression, StructType, StructType) =>
NamedExpression
- ): Expression = {
- val structConverter: (Expression, Expression) => Expression = (_, i) =>
- castToStructFunc(Alias(GetArrayItem(parent, i), i.toString)(), source,
target)
- val transformLambdaFunc = {
- val elementVar = NamedLambdaVariable("elementVar", source,
sourceNullable)
- val indexVar = NamedLambdaVariable("indexVar", IntegerType, false)
- LambdaFunction(structConverter(elementVar, indexVar), Seq(elementVar,
indexVar))
- }
- ArrayTransform(parent, transformLambdaFunc)
- }
-
- private def cast(expr: Expression, dataType: DataType): Expression = {
- val cast = Compatibility.cast(expr, dataType,
Option(conf.sessionLocalTimeZone))
- cast.setTagValue(Compatibility.castByTableInsertionTag, ())
- cast
- }
+}
- private def stringLengthCheck(expr: Expression, metadata: Metadata):
Expression = {
- if (!conf.charVarcharAsString) {
- CharVarcharUtils
- .getRawType(metadata)
- .map(rawType => CharVarcharUtils.stringLengthCheck(expr, rawType))
- .getOrElse(expr)
- } else {
- expr
- }
- }
+object PaimonAnalysis {
+ val PAIMON_WRITE_RESOLVED: TreeNodeTag[Unit] =
TreeNodeTag[Unit]("paimon.write.resolved")
}
case class PaimonPostHocResolutionRules(session: SparkSession) extends
Rule[LogicalPlan] {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonOutputResolver.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonOutputResolver.scala
new file mode 100644
index 0000000000..7a8d4a878d
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonOutputResolver.scala
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalyst.analysis
+
+import org.apache.paimon.spark.catalyst.Compatibility
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+import scala.collection.mutable
+
+/**
+ * Trimmed fork of Spark's `TableOutputResolver` with `mergeSchemaEnabled` for
schema evolution.
+ *
+ * - Missing target fields: NULL-fill at any depth.
+ * - Extra source fields: throw when `mergeSchemaEnabled = false`; otherwise
kept in the
+ * projection at any depth so `SchemaHelper.mergeSchema` evolves the table
at write time. Safe
+ * because `PaimonSparkTableBase` advertises `ACCEPT_ANY_SCHEMA`,
short-circuiting Spark's
+ * `outputResolved` check.
+ * - Nullable downcast: wrap with `AssertNotNull`.
+ * - Type mismatch: wrap with `Compatibility.cast` tagged for downstream
explicit-cast check.
+ */
+object PaimonOutputResolver {
+
+ def resolveOutputColumns(
+ tableName: String,
+ expected: Seq[Attribute],
+ query: LogicalPlan,
+ byName: Boolean,
+ conf: SQLConf,
+ mergeSchemaEnabled: Boolean): LogicalPlan = {
+ val resolved: Seq[NamedExpression] = if (byName) {
+ reorderColumnsByName(tableName, query.output, expected, conf,
mergeSchemaEnabled, Nil)
+ } else {
+ resolveColumnsByPosition(tableName, query.output, expected, conf, Nil)
+ }
+ if (resolved == query.output) {
+ query
+ } else {
+ Project(resolved, query)
+ }
+ }
+
+ private def reorderColumnsByName(
+ tableName: String,
+ inputCols: Seq[NamedExpression],
+ expectedCols: Seq[Attribute],
+ conf: SQLConf,
+ mergeSchemaEnabled: Boolean,
+ colPath: Seq[String]): Seq[NamedExpression] = {
+ val isTopLevel = colPath.isEmpty
+ val matchedNames = mutable.HashSet.empty[String]
+ val reordered = expectedCols.map {
+ expectedCol =>
+ val matches = inputCols.filter(col => conf.resolver(col.name,
expectedCol.name))
+ val newColPath = colPath :+ expectedCol.name
+ if (matches.isEmpty) {
+ nullFill(expectedCol)
+ } else if (matches.length > 1) {
+ throw new RuntimeException(
+ s"Cannot write to `$tableName`, " +
+ s"due to ambiguous column name `${newColPath.mkString(".")}`.")
+ } else {
+ matchedNames += matches.head.name
+ val actualExpectedCol = expectedCol.withDataType {
+
CharVarcharUtils.getRawType(expectedCol.metadata).getOrElse(expectedCol.dataType)
+ }
+ resolveField(
+ tableName,
+ matches.head,
+ actualExpectedCol,
+ byName = true,
+ conf,
+ mergeSchemaEnabled,
+ newColPath)
+ }
+ }
+
+ if (matchedNames.size < inputCols.length) {
+ val extras = inputCols.filterNot(col => matchedNames.contains(col.name))
+ if (!mergeSchemaEnabled) {
+ val extrasStr = extras.map(c => s"`${c.name}`").mkString(", ")
+ val msg = if (isTopLevel) {
+ s"Cannot write to `$tableName`, extra columns: $extrasStr"
+ } else {
+ s"Cannot write to `$tableName`, " +
+ s"extra struct fields at `${colPath.mkString(".")}`: $extrasStr"
+ }
+ throw new RuntimeException(msg)
+ }
+ reordered ++ extras.map(preserveAsAlias)
+ } else {
+ reordered
+ }
+ }
+
+ private def resolveColumnsByPosition(
+ tableName: String,
+ inputCols: Seq[NamedExpression],
+ expectedCols: Seq[Attribute],
+ conf: SQLConf,
+ colPath: Seq[String]): Seq[NamedExpression] = {
+ val actualExpectedCols = expectedCols.map {
+ attr =>
+ attr.withDataType {
+ CharVarcharUtils.getRawType(attr.metadata).getOrElse(attr.dataType)
+ }
+ }
+ if (inputCols.size != actualExpectedCols.size) {
+ val where = if (colPath.isEmpty) {
+ s"`$tableName`"
+ } else {
+ s"`$tableName` at `${colPath.mkString(".")}`"
+ }
+ throw new RuntimeException(s"Cannot write to $where, " +
+ s"the number of data columns (${inputCols.size}) doesn't match the
table schema's (${actualExpectedCols.size}).")
+ }
+ inputCols.zip(actualExpectedCols).map {
+ case (inputCol, expectedCol) =>
+ resolveField(
+ tableName,
+ inputCol,
+ expectedCol,
+ byName = false,
+ conf,
+ mergeSchemaEnabled = false,
+ colPath :+ expectedCol.name)
+ }
+ }
+
+ private def resolveField(
+ tableName: String,
+ input: NamedExpression,
+ expected: Attribute,
+ byName: Boolean,
+ conf: SQLConf,
+ mergeSchemaEnabled: Boolean,
+ colPath: Seq[String]): NamedExpression = {
+ (input.dataType, expected.dataType) match {
+ case (sourceType: StructType, targetType: StructType) =>
+ resolveStructType(
+ tableName,
+ input,
+ sourceType,
+ expected,
+ targetType,
+ byName,
+ conf,
+ mergeSchemaEnabled,
+ colPath)
+ case (sourceType: ArrayType, targetType: ArrayType) =>
+ resolveArrayType(
+ tableName,
+ input,
+ sourceType,
+ expected,
+ targetType,
+ byName,
+ conf,
+ mergeSchemaEnabled,
+ colPath)
+ case (sourceType: MapType, targetType: MapType) =>
+ resolveMapType(
+ tableName,
+ input,
+ sourceType,
+ expected,
+ targetType,
+ byName,
+ conf,
+ mergeSchemaEnabled,
+ colPath)
+ case _ =>
+ checkField(input, expected, conf, colPath)
+ }
+ }
+
+ private def resolveStructType(
+ tableName: String,
+ input: Expression,
+ sourceType: StructType,
+ expected: Attribute,
+ targetType: StructType,
+ byName: Boolean,
+ conf: SQLConf,
+ mergeSchemaEnabled: Boolean,
+ colPath: Seq[String]): NamedExpression = {
+ val nullCheckedInput = checkNullability(input, expected, colPath)
+ val fields = sourceType.zipWithIndex.map {
+ case (f, i) =>
+ Alias(GetStructField(nullCheckedInput, i, Option(f.name)), f.name)()
+ }
+ val resolved = if (byName) {
+ reorderColumnsByName(
+ tableName,
+ fields,
+ toAttributes(targetType),
+ conf,
+ mergeSchemaEnabled,
+ colPath)
+ } else {
+ resolveColumnsByPosition(tableName, fields, toAttributes(targetType),
conf, colPath)
+ }
+ val struct = CreateStruct(resolved)
+ val res = if (nullCheckedInput.nullable) {
+ If(IsNull(nullCheckedInput), Literal(null, struct.dataType), struct)
+ } else {
+ struct
+ }
+ Alias(res, expected.name)(explicitMetadata = Option(expected.metadata))
+ }
+
+ private def resolveArrayType(
+ tableName: String,
+ input: Expression,
+ sourceType: ArrayType,
+ expected: Attribute,
+ targetType: ArrayType,
+ byName: Boolean,
+ conf: SQLConf,
+ mergeSchemaEnabled: Boolean,
+ colPath: Seq[String]): NamedExpression = {
+ val nullCheckedInput = checkNullability(input, expected, colPath)
+ val param = NamedLambdaVariable("element", sourceType.elementType,
sourceType.containsNull)
+ val fakeAttr =
+ AttributeReference("element", targetType.elementType,
targetType.containsNull)()
+ val resolved = if (byName) {
+ reorderColumnsByName(tableName, Seq(param), Seq(fakeAttr), conf,
mergeSchemaEnabled, colPath)
+ } else {
+ resolveColumnsByPosition(tableName, Seq(param), Seq(fakeAttr), conf,
colPath)
+ }
+ assert(resolved.length == 1)
+ val elementExpr = stripOuterAlias(resolved.head)
+ val transformed = if (elementExpr.fastEquals(param)) {
+ nullCheckedInput
+ } else {
+ ArrayTransform(nullCheckedInput, LambdaFunction(elementExpr, Seq(param)))
+ }
+ Alias(transformed, expected.name)(explicitMetadata =
Option(expected.metadata))
+ }
+
+ private def resolveMapType(
+ tableName: String,
+ input: Expression,
+ sourceType: MapType,
+ expected: Attribute,
+ targetType: MapType,
+ byName: Boolean,
+ conf: SQLConf,
+ mergeSchemaEnabled: Boolean,
+ colPath: Seq[String]): NamedExpression = {
+ val nullCheckedInput = checkNullability(input, expected, colPath)
+ val keyParam = NamedLambdaVariable("key", sourceType.keyType, nullable =
false)
+ val fakeKeyAttr = AttributeReference("key", targetType.keyType, nullable =
false)()
+ val resolvedKey = if (byName) {
+ reorderColumnsByName(
+ tableName,
+ Seq(keyParam),
+ Seq(fakeKeyAttr),
+ conf,
+ mergeSchemaEnabled,
+ colPath)
+ } else {
+ resolveColumnsByPosition(tableName, Seq(keyParam), Seq(fakeKeyAttr),
conf, colPath)
+ }
+
+ val valueParam =
+ NamedLambdaVariable("value", sourceType.valueType,
sourceType.valueContainsNull)
+ val fakeValueAttr =
+ AttributeReference("value", targetType.valueType,
targetType.valueContainsNull)()
+ val resolvedValue = if (byName) {
+ reorderColumnsByName(
+ tableName,
+ Seq(valueParam),
+ Seq(fakeValueAttr),
+ conf,
+ mergeSchemaEnabled,
+ colPath)
+ } else {
+ resolveColumnsByPosition(tableName, Seq(valueParam), Seq(fakeValueAttr),
conf, colPath)
+ }
+
+ assert(resolvedKey.length == 1 && resolvedValue.length == 1)
+ val keyExpr = stripOuterAlias(resolvedKey.head)
+ val valueExpr = stripOuterAlias(resolvedValue.head)
+ val transformed = if (keyExpr.fastEquals(keyParam) &&
valueExpr.fastEquals(valueParam)) {
+ nullCheckedInput
+ } else {
+ val newKeys = if (keyExpr.fastEquals(keyParam)) {
+ MapKeys(nullCheckedInput)
+ } else {
+ ArrayTransform(MapKeys(nullCheckedInput), LambdaFunction(keyExpr,
Seq(keyParam)))
+ }
+ val newValues = if (valueExpr.fastEquals(valueParam)) {
+ MapValues(nullCheckedInput)
+ } else {
+ ArrayTransform(MapValues(nullCheckedInput), LambdaFunction(valueExpr,
Seq(valueParam)))
+ }
+ MapFromArrays(newKeys, newValues)
+ }
+ Alias(transformed, expected.name)(explicitMetadata =
Option(expected.metadata))
+ }
+
+ private def checkField(
+ input: NamedExpression,
+ expected: Attribute,
+ conf: SQLConf,
+ colPath: Seq[String]): NamedExpression = {
+ val attrTypeHasCharVarchar =
CharVarcharUtils.hasCharVarchar(expected.dataType)
+ val attrTypeWithoutCharVarchar = if (attrTypeHasCharVarchar) {
+ CharVarcharUtils.replaceCharVarcharWithString(expected.dataType)
+ } else {
+ expected.dataType
+ }
+ val casted = if (input.dataType == attrTypeWithoutCharVarchar) {
+ input
+ } else {
+ addCast(input, attrTypeWithoutCharVarchar, conf)
+ }
+ val withStrLenCheck = if (conf.charVarcharAsString ||
!attrTypeHasCharVarchar) {
+ casted
+ } else {
+ CharVarcharUtils.stringLengthCheck(casted, expected.dataType)
+ }
+ val nullChecked = checkNullability(withStrLenCheck, expected, colPath)
+ Alias(nullChecked, expected.name)(explicitMetadata =
Option(expected.metadata))
+ }
+
+ private def checkNullability(
+ input: Expression,
+ expected: Attribute,
+ colPath: Seq[String]): Expression = {
+ if (input.nullable && !expected.nullable) {
+ AssertNotNull(input, colPath)
+ } else {
+ input
+ }
+ }
+
+ private def addCast(expr: Expression, dataType: DataType, conf: SQLConf):
Expression = {
+ val cast = Compatibility.cast(expr, dataType,
Option(conf.sessionLocalTimeZone))
+ cast.setTagValue(Compatibility.castByTableInsertionTag, ())
+ cast
+ }
+
+ private def nullFill(expected: Attribute): NamedExpression = {
+ Alias(Literal(null, expected.dataType), expected.name)(
+ explicitMetadata = Option(expected.metadata))
+ }
+
+ private def preserveAsAlias(expr: NamedExpression): NamedExpression = expr
match {
+ case a: Alias => a
+ case other =>
+ Alias(other, other.name)(explicitMetadata = Option(other.metadata))
+ }
+
+ private def stripOuterAlias(expr: Expression): Expression = expr match {
+ case a: Alias => a.child
+ case other => other
+ }
+
+ private def toAttributes(structType: StructType): Seq[Attribute] = {
+ structType.map(f => AttributeReference(f.name, f.dataType, f.nullable,
f.metadata)())
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
index fe96ad2e14..d8b70f68f2 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
@@ -132,6 +132,10 @@ object PaimonUtils {
left.sameType(right)
}
+ def equalsIgnoreCompatibleNullability(from: DataType, to: DataType): Boolean
= {
+ DataType.equalsIgnoreCompatibleNullability(from, to)
+ }
+
def classIsLoadable(clazz: String): Boolean = {
SparkUtils.classIsLoadable(clazz)
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
index 1cdf210d89..fc8b4adb6e 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
@@ -415,7 +415,7 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
() -> {
writeTable("testAlterColumnType", "(1, null,
'a')");
})
- .hasMessageContaining("Cannot write null to non-null
column(b)");
+ .hasStackTraceContaining("value appeared in non-nullable
field");
List<Row> beforeAlter = spark.sql("SHOW CREATE TABLE
testAlterColumnType").collectAsList();
assertThat(beforeAlter.toString())
@@ -437,7 +437,7 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
() -> {
writeTable("testAlterColumnType", "(1, null,
'a')");
})
- .hasMessageContaining("Cannot write null to non-null
column(b)");
+ .hasStackTraceContaining("value appeared in non-nullable
field");
}
@Test
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
index 47a280868d..b44561a09c 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
@@ -23,7 +23,6 @@ import org.apache.paimon.schema.Schema
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.paimon.types.DataTypes
-import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionsException
import org.junit.jupiter.api.Assertions
@@ -41,10 +40,10 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
withTable("T") {
sql("CREATE TABLE T (id INT NOT NULL, name STRING)")
- val e1 = intercept[SparkException] {
+ val e1 = intercept[Exception] {
sql("""INSERT INTO T VALUES (1, "a"), (2, "b"), (null, "c")""")
}
- Assertions.assertTrue(e1.getMessage().contains("Cannot write null to
non-null column"))
+ Assertions.assertTrue(e1.getMessage().contains("value appeared in
non-nullable field"))
sql("""INSERT INTO T VALUES (1, "a"), (2, "b"), (3, null)""")
checkAnswer(
@@ -65,15 +64,15 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
|TBLPROPERTIES ('primary-key' = 'id,pt')
|""".stripMargin)
- val e1 = intercept[SparkException] {
+ val e1 = intercept[Exception] {
sql("""INSERT INTO T VALUES (1, "a", "pt1"), (2, "b", null)""")
}
- Assertions.assertTrue(e1.getMessage().contains("Cannot write null to
non-null column"))
+ Assertions.assertTrue(e1.getMessage().contains("value appeared in
non-nullable field"))
- val e2 = intercept[SparkException] {
+ val e2 = intercept[Exception] {
sql("""INSERT INTO T VALUES (1, "a", "pt1"), (null, "b", "pt2")""")
}
- Assertions.assertTrue(e2.getMessage().contains("Cannot write null to
non-null column"))
+ Assertions.assertTrue(e2.getMessage().contains("value appeared in
non-nullable field"))
sql("""INSERT INTO T VALUES (1, "a", "pt1"), (2, "b", "pt1"), (3, null,
"pt2")""")
checkAnswer(
@@ -340,65 +339,69 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
test("Paimon DDL: CREATE OR REPLACE TABLE AS SELECT on partitioned table") {
assume(gteqSpark3_4)
withTable("t") {
- sql("""
- |CREATE TABLE t (id BIGINT, data STRING, pt STRING)
- |USING paimon
- |PARTITIONED BY (pt)
- |TBLPROPERTIES ('primary-key' = 'id,pt', 'bucket' = '2')
- |""".stripMargin)
- sql("INSERT INTO t VALUES (1, 'old', 'p0')")
- val oldLocation = loadTable("t").location().toString
- Seq((2L, "x2", "p1"), (3L, "x3", "p2"))
- .toDF("id", "data", "pt")
- .createOrReplaceTempView("source")
+ withTempView("source") {
+ sql("""
+ |CREATE TABLE t (id BIGINT, data STRING, pt STRING)
+ |USING paimon
+ |PARTITIONED BY (pt)
+ |TBLPROPERTIES ('primary-key' = 'id,pt', 'bucket' = '2')
+ |""".stripMargin)
+ sql("INSERT INTO t VALUES (1, 'old', 'p0')")
+ val oldLocation = loadTable("t").location().toString
+ Seq((2L, "x2", "p1"), (3L, "x3", "p2"))
+ .toDF("id", "data", "pt")
+ .createOrReplaceTempView("source")
- sql("""
- |CREATE OR REPLACE TABLE t
- |USING paimon
- |PARTITIONED BY (pt)
- |TBLPROPERTIES ('primary-key' = 'id,pt', 'bucket' = '3')
- |AS SELECT * FROM source
- |""".stripMargin)
+ sql("""
+ |CREATE OR REPLACE TABLE t
+ |USING paimon
+ |PARTITIONED BY (pt)
+ |TBLPROPERTIES ('primary-key' = 'id,pt', 'bucket' = '3')
+ |AS SELECT * FROM source
+ |""".stripMargin)
- val replaced = loadTable("t")
- Assertions.assertEquals(oldLocation, replaced.location().toString)
- Assertions.assertEquals("3", replaced.options().get("bucket"))
- checkAnswer(
- sql("SELECT * FROM t ORDER BY id"),
- Seq((2L, "x2", "p1"), (3L, "x3", "p2")).toDF())
+ val replaced = loadTable("t")
+ Assertions.assertEquals(oldLocation, replaced.location().toString)
+ Assertions.assertEquals("3", replaced.options().get("bucket"))
+ checkAnswer(
+ sql("SELECT * FROM t ORDER BY id"),
+ Seq((2L, "x2", "p1"), (3L, "x3", "p2")).toDF())
+ }
}
}
test("Paimon DDL: CREATE OR REPLACE TABLE AS SELECT supports incompatible
schema") {
assume(gteqSpark3_4)
withTable("t") {
- sql("""
- |CREATE TABLE t (id BIGINT, data STRING)
- |USING paimon
- |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2')
- |""".stripMargin)
- sql("INSERT INTO t VALUES (1, 'old')")
- val oldLocation = loadTable("t").location().toString
- val oldSnapshotId = loadTable("t").snapshotManager().latestSnapshotId()
- Seq(("2", 20), ("3", 30)).toDF("id",
"amount").createOrReplaceTempView("source")
+ withTempView("source") {
+ sql("""
+ |CREATE TABLE t (id BIGINT, data STRING)
+ |USING paimon
+ |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2')
+ |""".stripMargin)
+ sql("INSERT INTO t VALUES (1, 'old')")
+ val oldLocation = loadTable("t").location().toString
+ val oldSnapshotId = loadTable("t").snapshotManager().latestSnapshotId()
+ Seq(("2", 20), ("3", 30)).toDF("id",
"amount").createOrReplaceTempView("source")
- sql("""
- |CREATE OR REPLACE TABLE t
- |USING paimon
- |TBLPROPERTIES ('bucket' = '-1')
- |AS SELECT * FROM source
- |""".stripMargin)
+ sql("""
+ |CREATE OR REPLACE TABLE t
+ |USING paimon
+ |TBLPROPERTIES ('bucket' = '-1')
+ |AS SELECT * FROM source
+ |""".stripMargin)
- val replaced = loadTable("t")
- Assertions.assertEquals(oldLocation, replaced.location().toString)
- Assertions.assertEquals("-1", replaced.options().get("bucket"))
- Assertions.assertEquals(Seq("id", "amount"),
spark.table("t").schema.fieldNames.toSeq)
- Assertions.assertEquals("string",
spark.table("t").schema("id").dataType.typeName)
- Assertions.assertEquals("integer",
spark.table("t").schema("amount").dataType.typeName)
- checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(("2", 20), ("3",
30)).toDF())
- checkAnswer(
- sql(s"SELECT id, data FROM t VERSION AS OF $oldSnapshotId"),
- Seq((1L, "old")).toDF())
+ val replaced = loadTable("t")
+ Assertions.assertEquals(oldLocation, replaced.location().toString)
+ Assertions.assertEquals("-1", replaced.options().get("bucket"))
+ Assertions.assertEquals(Seq("id", "amount"),
spark.table("t").schema.fieldNames.toSeq)
+ Assertions.assertEquals("string",
spark.table("t").schema("id").dataType.typeName)
+ Assertions.assertEquals("integer",
spark.table("t").schema("amount").dataType.typeName)
+ checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(("2", 20), ("3",
30)).toDF())
+ checkAnswer(
+ sql(s"SELECT id, data FROM t VERSION AS OF $oldSnapshotId"),
+ Seq((1L, "old")).toDF())
+ }
}
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTestBase.scala
index 01def579ef..87d80cb897 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTestBase.scala
@@ -22,7 +22,7 @@ import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
-import org.apache.spark.sql.types.DecimalType
+import org.apache.spark.sql.types.{DecimalType, StructType}
import org.junit.jupiter.api.Assertions
import java.sql.{Date, Timestamp}
@@ -366,6 +366,130 @@ abstract class DataFrameWriteTestBase extends
PaimonSparkTestBase {
}
}
+ test("Paimon: DataFrameWrite.saveAsTable should recursively align nested
fields by name") {
+ for (useV2Write <- Seq("true", "false")) {
+ withSparkSQLConf("spark.paimon.write.use-v2-write" -> useV2Write) {
+ withTable("source", "target") {
+ sql("""
+ |CREATE TABLE source (
+ | id INT,
+ | outer STRUCT<inner_arr: ARRAY<STRUCT<flag: BOOLEAN, label:
STRING>>>,
+ | nested_arr ARRAY<ARRAY<STRUCT<flag: BOOLEAN, label:
STRING>>>,
+ | nested_map MAP<STRING, STRUCT<flag: BOOLEAN, label: STRING>>
+ |) USING paimon
+ |""".stripMargin)
+
+ sql("""
+ |CREATE TABLE target (
+ | id INT,
+ | outer STRUCT<inner_arr: ARRAY<STRUCT<label: STRING, flag:
BOOLEAN>>>,
+ | nested_arr ARRAY<ARRAY<STRUCT<label: STRING, flag:
BOOLEAN>>>,
+ | nested_map MAP<STRING, STRUCT<label: STRING, flag: BOOLEAN>>
+ |) USING paimon
+ |""".stripMargin)
+
+ sql("""
+ |INSERT INTO source VALUES (
+ | 1,
+ | named_struct('inner_arr', array(named_struct('flag', true,
'label', 'outer'))),
+ | array(array(named_struct('flag', false, 'label', 'array'))),
+ | map('k', named_struct('flag', true, 'label', 'map'))
+ |)
+ |""".stripMargin)
+
+
spark.table("source").write.format("paimon").mode("append").saveAsTable("target")
+
+ checkAnswer(
+ sql("""
+ |SELECT
+ | outer.inner_arr[0].label,
+ | outer.inner_arr[0].flag,
+ | nested_arr[0][0].label,
+ | nested_arr[0][0].flag,
+ | nested_map['k'].label,
+ | nested_map['k'].flag
+ |FROM target
+ |""".stripMargin),
+ Seq(Row("outer", true, "array", false, "map", true))
+ )
+ }
+ }
+ }
+ }
+
+ test("Paimon: SQL insert should recursively keep nested fields by-position
semantics") {
+ withSparkSQLConf("spark.sql.ansi.enabled" -> "false") {
+ withTable("source", "target") {
+ sql("""
+ |CREATE TABLE source (
+ | id INT,
+ | outer STRUCT<inner_arr: ARRAY<STRUCT<flag: BOOLEAN, label:
STRING>>>,
+ | nested_arr ARRAY<ARRAY<STRUCT<flag: BOOLEAN, label: STRING>>>,
+ | nested_map MAP<STRING, STRUCT<flag: BOOLEAN, label: STRING>>
+ |) USING paimon
+ |""".stripMargin)
+
+ sql("""
+ |CREATE TABLE target (
+ | id INT,
+ | outer STRUCT<inner_arr: ARRAY<STRUCT<label: STRING, flag:
BOOLEAN>>>,
+ | nested_arr ARRAY<ARRAY<STRUCT<label: STRING, flag: BOOLEAN>>>,
+ | nested_map MAP<STRING, STRUCT<label: STRING, flag: BOOLEAN>>
+ |) USING paimon
+ |""".stripMargin)
+
+ sql("""
+ |INSERT INTO source VALUES (
+ | 1,
+ | named_struct('inner_arr', array(named_struct('flag', true,
'label', 'outer'))),
+ | array(array(named_struct('flag', false, 'label', 'array'))),
+ | map('k', named_struct('flag', true, 'label', 'map'))
+ |)
+ |""".stripMargin)
+
+ sql("INSERT INTO target SELECT * FROM source")
+
+ checkAnswer(
+ sql("""
+ |SELECT
+ | outer.inner_arr[0].label,
+ | outer.inner_arr[0].flag,
+ | nested_arr[0][0].label,
+ | nested_arr[0][0].flag,
+ | nested_map['k'].label,
+ | nested_map['k'].flag
+ |FROM target
+ |""".stripMargin),
+ Seq(Row("true", null, "false", null, "true", null))
+ )
+ }
+ }
+ }
+
+ test("Paimon: DataFrameWrite.saveAsTable should reject incompatible struct
casts") {
+ withTable("source", "target") {
+ sql("CREATE TABLE source (id INT, payload STRUCT<value: INT>) USING
paimon")
+ sql("CREATE TABLE target (id INT, payload INT) USING paimon")
+ sql("INSERT INTO source VALUES (1, named_struct('value', 1))")
+
+ val message = intercept[Exception] {
+
spark.table("source").write.format("paimon").mode("append").saveAsTable("target")
+ }.getMessage
+ assert(message.toLowerCase.contains("cast"))
+ }
+
+ withTable("source", "target") {
+ sql("CREATE TABLE source (id INT, payload INT) USING paimon")
+ sql("CREATE TABLE target (id INT, payload STRUCT<value: INT>) USING
paimon")
+ sql("INSERT INTO source VALUES (1, 1)")
+
+ val message = intercept[Exception] {
+
spark.table("source").write.format("paimon").mode("append").saveAsTable("target")
+ }.getMessage
+ assert(message.toLowerCase.contains("cast"))
+ }
+ }
+
withPk.foreach {
hasPk =>
bucketModes.foreach {
@@ -784,4 +908,148 @@ abstract class DataFrameWriteTestBase extends
PaimonSparkTestBase {
null) :: Nil
checkAnswer(spark.sql("SELECT * FROM T ORDER BY a, b"), expected3)
}
+
+ test("Paimon write merge-schema conflict: deep nested array element bigint
-> string") {
+ for (useV2Write <- Seq("true", "false")) {
+ withSparkSQLConf(
+ "spark.paimon.write.use-v2-write" -> useV2Write,
+ "spark.paimon.write.merge-schema.explicit-cast" -> "true") {
+ withTable("target") {
+ sql("""
+ |CREATE TABLE target (
+ | id STRING,
+ | data STRUCT<wind: STRUCT<impactPrts: ARRAY<STRING>>>
+ |) USING paimon
+ |""".stripMargin)
+ sql("""
+ |INSERT INTO target VALUES (
+ | 'r0',
+ | named_struct('wind', named_struct('impactPrts', array('p0',
'p1')))
+ |)
+ |""".stripMargin)
+
+ val sourceSchema =
+ StructType.fromDDL("id STRING, data STRUCT<wind:
STRUCT<impactPrts: ARRAY<BIGINT>>>")
+ val sourceDf = spark.createDataFrame(
+ java.util.Arrays.asList(Row("r1",
Row(Row(java.util.Arrays.asList(10L, 20L))))),
+ sourceSchema)
+ sourceDf.write
+ .format("paimon")
+ .mode("append")
+ .option("write.merge-schema", "true")
+ .saveAsTable("target")
+
+ checkAnswer(
+ sql("SELECT id, data.wind.impactPrts FROM target ORDER BY id"),
+ Seq(
+ Row("r0", Array("p0", "p1")),
+ Row("r1", Array("10", "20"))
+ )
+ )
+ }
+ }
+ }
+ }
+
+ test("Paimon write merge-schema conflict: top-level same-name column string
vs bigint") {
+ for (useV2Write <- Seq("true", "false")) {
+ withSparkSQLConf(
+ "spark.paimon.write.use-v2-write" -> useV2Write,
+ "spark.paimon.write.merge-schema.explicit-cast" -> "true") {
+ withTable("target") {
+ sql("CREATE TABLE target (id STRING, value BIGINT) USING paimon")
+ sql("INSERT INTO target VALUES ('r0', 1000L)")
+
+ val sourceSchema = StructType.fromDDL("id STRING, value STRING")
+ val sourceDf =
+ spark.createDataFrame(java.util.Arrays.asList(Row("r1", "2000")),
sourceSchema)
+ sourceDf.write
+ .format("paimon")
+ .mode("append")
+ .option("write.merge-schema", "true")
+ .saveAsTable("target")
+
+ checkAnswer(
+ sql("SELECT id, value FROM target ORDER BY id"),
+ Seq(Row("r0", 1000L), Row("r1", 2000L))
+ )
+ }
+ }
+ }
+ }
+
+ test("Paimon write merge-schema conflict: deep nested struct widening int ->
bigint") {
+ for (useV2Write <- Seq("true", "false")) {
+ withSparkSQLConf("spark.paimon.write.use-v2-write" -> useV2Write) {
+ withTable("target") {
+ sql("""
+ |CREATE TABLE target (
+ | id STRING,
+ | payload STRUCT<
+ | inner: STRUCT<
+ | items: ARRAY<STRUCT<
+ | prtId: BIGINT,
+ | seqNum: BIGINT,
+ | missing: ARRAY<STRUCT<
+ | port: BIGINT,
+ | terminal: BIGINT
+ | >>
+ | >>
+ | >
+ | >
+ |) USING paimon
+ |""".stripMargin)
+ sql("""
+ |INSERT INTO target VALUES (
+ | 'r0',
+ | named_struct('inner', named_struct('items',
array(named_struct(
+ | 'prtId', 10L,
+ | 'seqNum', 1L,
+ | 'missing', array(named_struct('port', 100L, 'terminal',
200L))
+ | ))))
+ |)
+ |""".stripMargin)
+
+ val sourceSchema = StructType.fromDDL(
+ "id STRING, " +
+ "payload STRUCT<" +
+ " inner: STRUCT<" +
+ " items: ARRAY<STRUCT<" +
+ " prtId: INT, " +
+ " seqNum: INT, " +
+ " missing: ARRAY<STRUCT<port: INT, terminal: INT>>" +
+ " >>" +
+ " >" +
+ ">")
+ val sourceDf = spark.createDataFrame(
+ java.util.Arrays.asList(Row(
+ "r1",
+ Row(
+ Row(java.util.Arrays.asList(Row(20, 2,
java.util.Arrays.asList(Row(300, 400)))))))),
+ sourceSchema)
+ sourceDf.write
+ .format("paimon")
+ .mode("append")
+ .option("write.merge-schema", "true")
+ .saveAsTable("target")
+
+ checkAnswer(
+ sql("""
+ |SELECT
+ | id,
+ | payload.inner.items[0].prtId,
+ | payload.inner.items[0].seqNum,
+ | payload.inner.items[0].missing[0].port,
+ | payload.inner.items[0].missing[0].terminal
+ |FROM target ORDER BY id
+ |""".stripMargin),
+ Seq(
+ Row("r0", 10L, 1L, 100L, 200L),
+ Row("r1", 20L, 2L, 300L, 400L)
+ )
+ )
+ }
+ }
+ }
+ }
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala
index baecdaf997..2780a49e53 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala
@@ -92,25 +92,22 @@ abstract class InsertOverwriteTableTestBase extends
PaimonSparkTestBase {
val msg1 = intercept[Exception] {
sql("INSERT INTO TABLE t1 BY NAME SELECT col1, col2 as col1
FROM t1")
}
- assert(msg1.getMessage.contains("due to column name
conflicts"))
+ assert(msg1.getMessage.contains("due to ambiguous column name
`col1`"))
// name does not match
val msg2 = intercept[Exception] {
sql("INSERT INTO TABLE t1 BY NAME SELECT col1, col2 as colx
FROM t1")
}
- assert(msg2.getMessage.contains("due to unknown column names"))
+ assert(msg2.getMessage.contains("extra columns: `colx`"))
// query column size bigger than table's
val msg3 = intercept[Exception] {
sql("INSERT INTO TABLE t1 BY NAME SELECT col1, col2, col3,
col4, col4 as col5 FROM t1")
}
- assert(
- msg3.getMessage.contains(
- "the number of data columns don't match with the table
schema"))
+ assert(msg3.getMessage.contains("extra columns: `col5`"))
// non-nullable column has no specified value
val msg4 = intercept[Exception] {
sql("INSERT INTO TABLE t2 BY NAME SELECT col2 FROM t2")
}
- assert(
- msg4.getMessage.contains("non-nullable column `col1` has no
specified value"))
+ assert(msg4.getMessage.contains("Cannot write null to non-null
column(col1)"))
// by position
// column size does not match
@@ -119,7 +116,7 @@ abstract class InsertOverwriteTableTestBase extends
PaimonSparkTestBase {
}
assert(
msg5.getMessage.contains(
- "the number of data columns don't match with the table
schema"))
+ "the number of data columns (1) doesn't match the table
schema's (4)"))
}
}
}
@@ -209,28 +206,7 @@ abstract class InsertOverwriteTableTestBase extends
PaimonSparkTestBase {
val msg = intercept[Exception] {
sql("INSERT INTO t2 BY NAME SELECT * FROM t1")
}.getMessage
- assert(msg.contains("name conflicts"))
- }
- }
- }
- }
-
- test("Paimon: insert by name rejects nested struct target fields colliding
under resolver") {
- assume(gteqSpark3_5)
- withSparkSQLConf("spark.sql.caseSensitive" -> "true") {
- withTable("t1", "t2") {
- spark.sql("""CREATE TABLE t1 (id INT NOT NULL, info STRUCT<name:
STRING>)
- |TBLPROPERTIES ('write-only' = 'true')""".stripMargin)
- // target has both `name` and `Name`, legal when session is
case-sensitive
- spark.sql("""CREATE TABLE t2 (id INT NOT NULL, info STRUCT<name:
STRING, Name: STRING>)
- |TBLPROPERTIES ('write-only' = 'true')""".stripMargin)
- sql("INSERT INTO t1 VALUES (1, struct('Alice'))")
-
- withSparkSQLConf("spark.sql.caseSensitive" -> "false") {
- val msg = intercept[Exception] {
- sql("INSERT INTO t2 BY NAME SELECT * FROM t1")
- }.getMessage
- assert(msg.contains("conflicting target field names"))
+ assert(msg.contains("due to ambiguous column name `info.name`"))
}
}
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteMergeSchemaTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteMergeSchemaTest.scala
index c73048f7d0..2a17d6297e 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteMergeSchemaTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteMergeSchemaTest.scala
@@ -106,7 +106,7 @@ class V2WriteMergeSchemaTest extends PaimonSparkTestBase {
val error = intercept[RuntimeException] {
spark.sql("INSERT INTO t BY NAME SELECT 3 AS a, '3' AS b, 3 AS c")
}.getMessage
- assert(error.contains("the number of data columns don't match with the
table schema's"))
+ assert(error.contains("extra columns: `c`"))
}
}
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WriteMergeSchemaTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WriteMergeSchemaTest.scala
index 6be87a82fa..83b423438a 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WriteMergeSchemaTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WriteMergeSchemaTest.scala
@@ -110,7 +110,7 @@ class WriteMergeSchemaTest extends PaimonSparkTestBase {
val error = intercept[RuntimeException] {
spark.sql("INSERT INTO t BY NAME SELECT 3 AS a, '3' AS b, 3 AS c")
}.getMessage
- assert(error.contains("the number of data columns don't match with the
table schema's"))
+ assert(error.contains("extra columns"))
}
}
}
@@ -498,4 +498,52 @@ class WriteMergeSchemaTest extends PaimonSparkTestBase {
}
}
}
+
+ test("Write merge schema: array of struct missing nested field by
dataframe") {
+ withTable("t") {
+ sql("""
+ |CREATE TABLE t (
+ | ids ARRAY<BIGINT>,
+ | a BIGINT,
+ | b BIGINT,
+ | items ARRAY<STRUCT<
+ | f1: INT,
+ | f2: STRING,
+ | f3: STRING,
+ | f4: STRING,
+ | f5: STRING,
+ | f6: INT>>)
+ |""".stripMargin)
+
+ val sourceDataFrame = sql("""
+ |SELECT
+ | array(1, 2) AS ids,
+ | 1 AS a,
+ | 2 AS b,
+ | array(named_struct(
+ | 'f1', 10,
+ | 'f2', 'v2',
+ | 'f3', 'v3',
+ | 'f4', 'v4',
+ | 'f5', 'v5')) AS items
+ |""".stripMargin)
+
+ val alignedDataFrame = sourceDataFrame.select(
+ sourceDataFrame("ids").cast("ARRAY<BIGINT>").as("ids"),
+ sourceDataFrame("a").cast("BIGINT").as("a"),
+ sourceDataFrame("b").cast("BIGINT").as("b"),
+ sourceDataFrame("items")
+ )
+
+ alignedDataFrame.write
+ .format("paimon")
+ .mode("append")
+ .option("write.merge-schema", "true")
+ .saveAsTable("t")
+
+ checkAnswer(
+ sql("SELECT * FROM t"),
+ Seq(Row(Seq(1L, 2L), 1L, 2L, Seq(Row(10, "v2", "v3", "v4", "v5",
null)))))
+ }
+ }
}