This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 347c45efa5 [spark] Support merge schema in MERGE INTO (#7789)
347c45efa5 is described below
commit 347c45efa52082dadb2b99f4ad730f9123f1a7ca
Author: Zouxxyy <[email protected]>
AuthorDate: Mon May 11 16:22:59 2026 +0800
[spark] Support merge schema in MERGE INTO (#7789)
Add schema evolution support for MERGE INTO and fix nested-field
alignment.
- With `spark.paimon.write.merge-schema=true`, `UPDATE *` / `INSERT *`
evolves target schema with new source columns. Star clauses pull from
source by name; explicit clauses fill NULL.
- A `FROM_STAR` `TreeNodeTag` preserves the original star intent, so a
fully-listed explicit clause is not mistaken for `*`.
- `AssignmentAlignmentHelper` now reorders nested struct / array<struct>
/ map fields by name.
---
.../analysis/AssignmentAlignmentHelper.scala | 180 --------
.../analysis/PaimonMergeIntoResolverBase.scala | 5 +-
.../apache/spark/sql/paimon/shims/Spark4Shim.scala | 24 +-
.../analysis/AssignmentAlignmentHelper.scala | 104 ++++-
.../analysis/MergeSchemaEvolutionHelper.scala | 153 +++++++
.../catalyst/analysis/PaimonMergeActionTags.scala | 46 ++
.../catalyst/analysis/PaimonMergeIntoBase.scala | 91 ++--
.../analysis/PaimonMergeIntoResolverBase.scala | 5 +-
.../paimon/spark/commands/SchemaHelper.scala | 127 +++--
.../apache/spark/sql/paimon/shims/SparkShim.scala | 13 +-
.../sql/MergeIntoNotMatchedBySourceTest.scala | 37 ++
.../paimon/spark/sql/MergeIntoTableTestBase.scala | 510 +++++++++++++++++++++
.../apache/spark/sql/paimon/shims/Spark3Shim.scala | 24 +-
.../analysis/Spark41MergeIntoRewrite.scala | 18 +-
.../apache/spark/sql/paimon/shims/Spark4Shim.scala | 24 +-
15 files changed, 1059 insertions(+), 302 deletions(-)
diff --git
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala
deleted file mode 100644
index f61ed71b31..0000000000
---
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark.catalyst.analysis
-
-import
org.apache.paimon.spark.SparkTypeUtils.CURRENT_DEFAULT_COLUMN_METADATA_KEY
-import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.SQLConfHelper
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
CreateNamedStruct, Expression, GetStructField, Literal, NamedExpression}
-import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction,
InsertAction, InsertStarAction, MergeAction, MergeIntoTable, UpdateAction,
UpdateStarAction}
-import org.apache.spark.sql.types.StructType
-
-trait AssignmentAlignmentHelper extends SQLConfHelper with ExpressionHelper {
-
- private lazy val resolver = conf.resolver
-
- /**
- * @param ref
- * attribute reference seq, e.g. a => Seq["a"], s.c1 => Seq["s", "c1"]
- * @param expr
- * update expression
- */
- private case class AttrUpdate(ref: Seq[String], expr: Expression)
-
- /**
- * Generate aligned expressions, only supports PrimitiveType and StructType.
For example, if attrs
- * are [a int, b int, s struct(c1 int, c2 int)] and update assignments are
[a = 1, s.c1 = 2], will
- * return [1, b, struct(2, c2)].
- * @param attrs
- * target attrs
- * @param assignments
- * update assignments
- * @return
- * aligned expressions
- */
- protected def generateAlignedExpressions(
- attrs: Seq[Attribute],
- assignments: Seq[Assignment],
- isInsert: Boolean = false): Seq[Expression] = {
- val attrUpdates = assignments.map(a => AttrUpdate(toRefSeq(a.key),
a.value))
- recursiveAlignUpdates(attrs, attrUpdates, Nil, isInsert)
- }
-
- protected def alignAssignments(
- attrs: Seq[Attribute],
- assignments: Seq[Assignment],
- isInsert: Boolean = false): Seq[Assignment] = {
- generateAlignedExpressions(attrs, assignments, isInsert).zip(attrs).map {
- case (expression, field) => Assignment(field, expression)
- }
- }
-
- /**
- * Align assignments in a MergeAction based on the target table's output
attributes.
- * - DeleteAction: returns as-is
- * - UpdateAction: aligns assignments for update
- * - InsertAction: aligns assignments for insert
- */
- protected def alignMergeAction(action: MergeAction, targetOutput:
Seq[Attribute]): MergeAction = {
- action match {
- case d @ DeleteAction(_) => d
- case u @ PaimonUpdateAction(_, assignments) =>
- u.copy(assignments = alignAssignments(targetOutput, assignments))
- case i @ InsertAction(_, assignments) =>
- i.copy(assignments = alignAssignments(targetOutput, assignments,
isInsert = true))
- case _: UpdateStarAction =>
- throw new RuntimeException("UpdateStarAction should not be here.")
- case _: InsertStarAction =>
- throw new RuntimeException("InsertStarAction should not be here.")
- case _ =>
- throw new RuntimeException(s"Can't recognize this action: $action")
- }
- }
-
- private def recursiveAlignUpdates(
- targetAttrs: Seq[NamedExpression],
- updates: Seq[AttrUpdate],
- namePrefix: Seq[String] = Nil,
- isInsert: Boolean = false): Seq[Expression] = {
-
- // build aligned updated expression for each target attr
- targetAttrs.map {
- targetAttr =>
- val headMatchedUpdates = updates.filter(u => resolver(u.ref.head,
targetAttr.name))
- if (headMatchedUpdates.isEmpty) {
- if (isInsert) {
- // For Insert, use default value or NULL for missing columns
- getDefaultValueOrNull(targetAttr)
- } else {
- // For Update, return the attr as is
- targetAttr
- }
- } else {
- val exactMatchedUpdate = headMatchedUpdates.find(_.ref.size == 1)
- if (exactMatchedUpdate.isDefined) {
- if (headMatchedUpdates.size == 1) {
- // when an exact match (no nested fields) occurs, it must be the
only match, then return it's expr
- castIfNeeded(exactMatchedUpdate.get.expr, targetAttr.dataType)
- } else {
- // otherwise, there must be conflicting updates, for example:
- // - update the same attr multiple times
- // - update a struct attr and its fields at the same time (e.g.
s and s.c1)
- val conflictingAttrNames =
- headMatchedUpdates.map(u => (namePrefix ++
u.ref).mkString(".")).distinct
- throw new UnsupportedOperationException(
- s"Conflicting update/insert on attrs:
${conflictingAttrNames.mkString(", ")}"
- )
- }
- } else {
- targetAttr.dataType match {
- case StructType(fields) =>
- val fieldExprs = fields.zipWithIndex.map {
- case (field, ordinal) =>
- Alias(GetStructField(targetAttr, ordinal,
Some(field.name)), field.name)()
- }
- val newUpdates = updates.map(u => u.copy(ref = u.ref.tail))
- // process StructType's nested fields recursively
- val updatedFieldExprs =
- recursiveAlignUpdates(
- fieldExprs,
- newUpdates,
- namePrefix :+ targetAttr.name,
- isInsert)
-
- // build updated struct expression
- CreateNamedStruct(fields.zip(updatedFieldExprs).flatMap {
- case (field, expr) =>
- Seq(Literal(field.name), expr)
- })
- case _ =>
- // can't reach here
- throw new UnsupportedOperationException("")
- }
- }
- }
- }
- }
-
- /** Get the default value expression for an attribute, or NULL if no default
value is defined. */
- private def getDefaultValueOrNull(attr: NamedExpression): Expression = {
- attr match {
- case a: Attribute if
a.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY) =>
- val defaultValueStr =
a.metadata.getString(CURRENT_DEFAULT_COLUMN_METADATA_KEY)
- parseAndResolveDefaultValue(defaultValueStr, a)
- case _ =>
- Literal(null, attr.dataType)
- }
- }
-
- /** Parse the default value string and resolve it to an expression. */
- private def parseAndResolveDefaultValue(defaultValueStr: String, attr:
Attribute): Expression = {
- try {
- val spark = SparkSession.active
- val parsed =
spark.sessionState.sqlParser.parseExpression(defaultValueStr)
- castIfNeeded(parsed, attr.dataType)
- } catch {
- case _: Exception =>
- // If parsing fails, fall back to NULL
- Literal(null, attr.dataType)
- }
- }
-
-}
diff --git
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala
index ba516ddbe8..4222c54d6e 100644
---
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala
+++
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala
@@ -71,7 +71,8 @@ trait PaimonMergeIntoResolverBase extends ExpressionHelper {
}
val resolvedAssignments =
resolveAssignments(resolve, assignments, merge, SOURCE_ONLY)
- UpdateAction(resolvedCond, resolvedAssignments)
+ // Tag so merge-schema can distinguish `UPDATE *` from a fully-listed
explicit clause.
+ PaimonMergeActionTags.markFromStar(UpdateAction(resolvedCond,
resolvedAssignments))
case action =>
throw new RuntimeException(s"Can't recognize this action: $action")
}
@@ -97,7 +98,7 @@ trait PaimonMergeIntoResolverBase extends ExpressionHelper {
}
val resolvedAssignments =
resolveAssignments(resolve, assignments, merge, SOURCE_ONLY)
- InsertAction(resolvedCond, resolvedAssignments)
+ PaimonMergeActionTags.markFromStar(InsertAction(resolvedCond,
resolvedAssignments))
case action =>
throw new RuntimeException(s"Can't recognize this action: $action")
}
diff --git
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
index 3b581e325f..11fdfbd579 100644
---
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
+++
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
@@ -29,10 +29,10 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{CTESubstitution,
SubstituteUnresolvedOrdinals}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CTERelationRef,
LogicalPlan, MergeAction, MergeIntoTable, MergeRows, SubqueryAlias,
UnresolvedWith}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment,
CTERelationRef, InsertAction, LogicalPlan, MergeAction, MergeIntoTable,
MergeRows, SubqueryAlias, UnresolvedWith, UpdateAction}
import org.apache.spark.sql.catalyst.plans.logical.MergeRows.Keep
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.ArrayData
@@ -40,6 +40,7 @@ import org.apache.spark.sql.connector.catalog.{CatalogV2Util,
Identifier, Table,
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.SparkFormatTable
import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex,
PartitionSpec}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.streaming.{FileStreamSink,
MetadataLogFileIndex}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataTypes, StructType, VariantType}
@@ -138,6 +139,25 @@ class Spark4Shim extends SparkShim {
withSchemaEvolution)
}
+ override def copyDataSourceV2Relation(
+ relation: DataSourceV2Relation,
+ table: Table,
+ output: Seq[AttributeReference]): DataSourceV2Relation = {
+ relation.copy(table = table, output = output)
+ }
+
+ override def copyUpdateAction(
+ action: UpdateAction,
+ assignments: Seq[Assignment]): UpdateAction = {
+ action.copy(assignments = assignments)
+ }
+
+ override def copyInsertAction(
+ action: InsertAction,
+ assignments: Seq[Assignment]): InsertAction = {
+ action.copy(assignments = assignments)
+ }
+
// Spark 4.0 still has `SubstituteUnresolvedOrdinals` (Spark 4.1 removed it
because the new
// resolver framework handles ordinals inline). `PaimonViewResolver` applies
the shim's early
// rules to the parsed view text before storing, so we must substitute
`ORDER BY 1` →
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala
index f61ed71b31..f85baf846a 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala
@@ -21,11 +21,12 @@ package org.apache.paimon.spark.catalyst.analysis
import
org.apache.paimon.spark.SparkTypeUtils.CURRENT_DEFAULT_COLUMN_METADATA_KEY
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{PaimonUtils, SparkSession}
import org.apache.spark.sql.catalyst.SQLConfHelper
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
CreateNamedStruct, Expression, GetStructField, Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, ArrayTransform,
Attribute, CreateNamedStruct, Expression, GetStructField, If, IsNull,
LambdaFunction, Literal, MapFromArrays, MapKeys, MapValues, NamedExpression,
NamedLambdaVariable}
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction,
InsertAction, InsertStarAction, MergeAction, MergeIntoTable, UpdateAction,
UpdateStarAction}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
trait AssignmentAlignmentHelper extends SQLConfHelper with ExpressionHelper {
@@ -67,26 +68,23 @@ trait AssignmentAlignmentHelper extends SQLConfHelper with
ExpressionHelper {
}
}
- /**
- * Align assignments in a MergeAction based on the target table's output
attributes.
- * - DeleteAction: returns as-is
- * - UpdateAction: aligns assignments for update
- * - InsertAction: aligns assignments for insert
- */
+ /** Align a MergeAction's assignments to target output. Star actions must
already be expanded. */
protected def alignMergeAction(action: MergeAction, targetOutput:
Seq[Attribute]): MergeAction = {
- action match {
+ // `copyXxxAction` rebuilds the node and drops tags; re-carry FROM_STAR so
merge-schema works.
+ val aligned = action match {
case d @ DeleteAction(_) => d
case u @ PaimonUpdateAction(_, assignments) =>
- u.copy(assignments = alignAssignments(targetOutput, assignments))
+ SparkShimLoader.shim.copyUpdateAction(u,
alignAssignments(targetOutput, assignments))
case i @ InsertAction(_, assignments) =>
- i.copy(assignments = alignAssignments(targetOutput, assignments,
isInsert = true))
- case _: UpdateStarAction =>
- throw new RuntimeException("UpdateStarAction should not be here.")
- case _: InsertStarAction =>
- throw new RuntimeException("InsertStarAction should not be here.")
+ SparkShimLoader.shim.copyInsertAction(
+ i,
+ alignAssignments(targetOutput, assignments, isInsert = true))
+ case _: UpdateStarAction | _: InsertStarAction =>
+ throw new RuntimeException(s"Star action should already be expanded:
$action")
case _ =>
throw new RuntimeException(s"Can't recognize this action: $action")
}
+ PaimonMergeActionTags.carryFromStar(action, aligned)
}
private def recursiveAlignUpdates(
@@ -112,7 +110,7 @@ trait AssignmentAlignmentHelper extends SQLConfHelper with
ExpressionHelper {
if (exactMatchedUpdate.isDefined) {
if (headMatchedUpdates.size == 1) {
// when an exact match (no nested fields) occurs, it must be the
only match, then return it's expr
- castIfNeeded(exactMatchedUpdate.get.expr, targetAttr.dataType)
+ resolveByNameAndCast(exactMatchedUpdate.get.expr,
targetAttr.dataType)
} else {
// otherwise, there must be conflicting updates, for example:
// - update the same attr multiple times
@@ -177,4 +175,76 @@ trait AssignmentAlignmentHelper extends SQLConfHelper with
ExpressionHelper {
}
}
+ /**
+ * Resolve an assignment value expression by-name against the target type,
then cast if needed.
+ * Recursively reorders nested type fields (Struct, Array, Map and any
combination) by name to
+ * match target field order before casting. This is consistent with Spark's
native MERGE INTO
+ * behavior (see TableOutputResolver.resolveUpdate).
+ */
+ private def resolveByNameAndCast(expression: Expression, targetType:
DataType): Expression = {
+ if (PaimonUtils.sameType(expression.dataType, targetType)) {
+ // Types already structurally identical — no reordering needed.
+ // This guarantees idempotence when the rule is applied multiple times.
+ castIfNeeded(expression, targetType)
+ } else {
+ val reordered = reorderFieldsByName(expression, expression.dataType,
targetType)
+ castIfNeeded(reordered, targetType)
+ }
+ }
+
+ /**
+ * Recursively reorder nested type fields by name to match target type's
field order. Supports
+ * StructType, ArrayType and MapType in any nesting combination. Returns the
original expression
+ * if no reordering is needed.
+ */
+ private def reorderFieldsByName(
+ expression: Expression,
+ sourceType: DataType,
+ targetType: DataType): Expression = {
+ (sourceType, targetType) match {
+ case (s: StructType, t: StructType) if s != t =>
+ reorderStructByName(expression, s, t)
+ case (ArrayType(sElem, sNull), ArrayType(tElem, _)) if sElem != tElem =>
+ val elementVar = NamedLambdaVariable("element", sElem, sNull)
+ val reordered = reorderFieldsByName(elementVar, sElem, tElem)
+ ArrayTransform(expression, LambdaFunction(reordered, Seq(elementVar)))
+ case (MapType(sKey, sVal, sValNull), MapType(tKey, tVal, _))
+ if sKey != tKey || sVal != tVal =>
+ val keyVar = NamedLambdaVariable("key", sKey, nullable = false)
+ val valVar = NamedLambdaVariable("value", sVal, sValNull)
+ val reorderedKey = reorderFieldsByName(keyVar, sKey, tKey)
+ val reorderedVal = reorderFieldsByName(valVar, sVal, tVal)
+ val newKeys = ArrayTransform(MapKeys(expression),
LambdaFunction(reorderedKey, Seq(keyVar)))
+ val newVals =
+ ArrayTransform(MapValues(expression), LambdaFunction(reorderedVal,
Seq(valVar)))
+ MapFromArrays(newKeys, newVals)
+ case _ =>
+ expression
+ }
+ }
+
+ /** Reorder source struct fields to match target field order by name,
recursing into nested types. */
+ private def reorderStructByName(
+ expression: Expression,
+ sourceStruct: StructType,
+ targetStruct: StructType): Expression = {
+ val reorderedFields = targetStruct.map {
+ targetField =>
+ sourceStruct.fields.zipWithIndex.find(_._1.name == targetField.name)
match {
+ case Some((sourceField, sourceIdx)) =>
+ val fieldExpr = GetStructField(expression, sourceIdx,
Some(sourceField.name))
+ val reordered =
+ reorderFieldsByName(fieldExpr, sourceField.dataType,
targetField.dataType)
+ Alias(reordered, targetField.name)()
+ case None =>
+ Alias(Literal(null, targetField.dataType), targetField.name)()
+ }
+ }
+ val struct = CreateNamedStruct(reorderedFields.flatMap(a =>
Seq(Literal(a.name), a.child)))
+ if (expression.nullable) {
+ If(IsNull(expression), Literal(null, struct.dataType), struct)
+ } else {
+ struct
+ }
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/MergeSchemaEvolutionHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/MergeSchemaEvolutionHelper.scala
new file mode 100644
index 0000000000..9e44f8c311
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/MergeSchemaEvolutionHelper.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalyst.analysis
+
+import org.apache.paimon.spark.{SparkTable, SparkTypeUtils}
+import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
+import org.apache.paimon.spark.commands.SchemaHelper
+import org.apache.paimon.spark.schema.SparkSystemColumns
+import org.apache.paimon.spark.util.OptionUtils
+import org.apache.paimon.table.FileStoreTable
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.Resolver
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression, Literal}
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction,
InsertAction, MergeAction, MergeIntoTable, UpdateAction}
+import org.apache.spark.sql.connector.catalog.TableCatalog
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
+import org.apache.spark.sql.types.{StructField, StructType}
+
+/**
+ * Shared MERGE INTO `merge-schema=true` evolution logic. Mixed in by both the
postHoc V1 path
+ * ([[PaimonMergeIntoBase]]) and the Spark 4.1 Resolution-batch rewrite for
pure append-only tables
+ * (`Spark41MergeIntoRewrite`). Only fires when at least one action was
authored as `UPDATE *` /
+ * `INSERT *`, tracked via [[PaimonMergeActionTags]].
+ */
+trait MergeSchemaEvolutionHelper extends ExpressionHelper {
+
+ /**
+ * @param resolveNotMatchedBySource
+ * how to resolve `notMatchedBySourceActions` on Spark 3.2+/4.x; the
version-specific shim is
+ * supplied by the caller.
+ */
+ protected def evolveTargetIfNeeded(
+ merge: MergeIntoTable,
+ relation: DataSourceV2Relation,
+ v2Table: SparkTable,
+ spark: SparkSession,
+ resolveNotMatchedBySource: MergeIntoTable => Seq[MergeAction])
+ : Option[(MergeIntoTable, DataSourceV2Relation, SparkTable)] = {
+ if (!OptionUtils.writeMergeSchemaEnabled()) return None
+
+ val notMatchedBySourceActions = resolveNotMatchedBySource(merge)
+ val allActions = merge.matchedActions ++ merge.notMatchedActions ++
notMatchedBySourceActions
+ if (!allActions.exists(PaimonMergeActionTags.isFromStar)) return None
+
+ val fileStoreTable = v2Table.getTable.asInstanceOf[FileStoreTable]
+ val sourceSchema = StructType(
+ merge.sourceTable.output.map(a => StructField(a.name, a.dataType,
a.nullable)))
+ val filteredSourceSchema =
SparkSystemColumns.filterSparkSystemColumns(sourceSchema)
+ val allowExplicitCast = OptionUtils.writeMergeSchemaExplicitCastEnabled()
+ val updatedFileStoreTable = SchemaHelper
+ .mergeAndCommitSchema(fileStoreTable, filteredSourceSchema,
allowExplicitCast)
+ .getOrElse(return None)
+
+ // Invalidate Spark catalog cache so subsequent queries see the new schema.
+ for (catalog <- relation.catalog; ident <- relation.identifier) {
+ catalog.asInstanceOf[TableCatalog].invalidateTable(ident)
+ }
+
+ val updatedV2Table = v2Table.copy(table = updatedFileStoreTable)
+ val resolver = spark.sessionState.conf.resolver
+ val mergedSparkSchema =
+
SparkTypeUtils.fromPaimonRowType(updatedFileStoreTable.schema().logicalRowType())
+ val newOutput = buildEvolvedOutput(mergedSparkSchema, relation.output,
resolver)
+ val updatedRelation =
+ SparkShimLoader.shim.copyDataSourceV2Relation(relation, updatedV2Table,
newOutput)
+ val updatedTargetTable = merge.targetTable.transform {
+ case r: DataSourceV2Relation if r eq relation => updatedRelation
+ }
+
+ val newAttrs = {
+ val oldNames = relation.output.map(_.name).toSet
+ newOutput.filterNot(a => oldNames.exists(resolver(_, a.name)))
+ }
+ val expand = expandAction(newAttrs, merge.sourceTable.output, resolver) _
+
+ val updatedMerge = SparkShimLoader.shim.createMergeIntoTable(
+ updatedTargetTable,
+ merge.sourceTable,
+ merge.mergeCondition,
+ merge.matchedActions.map(expand),
+ merge.notMatchedActions.map(expand),
+ notMatchedBySourceActions.map(expand),
+ withSchemaEvolution = false
+ )
+ Some((updatedMerge, updatedRelation, updatedV2Table))
+ }
+
+ /** Rebuild the relation's output: reuse existing attribute ids, fabricate
ones for new fields. */
+ private def buildEvolvedOutput(
+ mergedSparkSchema: StructType,
+ oldOutput: Seq[Attribute],
+ resolver: Resolver): Seq[AttributeReference] = {
+ mergedSparkSchema.map {
+ field =>
+ oldOutput.find(a => resolver(a.name, field.name)) match {
+ case Some(existing: AttributeReference) =>
+ existing.copy(dataType = field.dataType, nullable =
field.nullable)(
+ exprId = existing.exprId,
+ qualifier = existing.qualifier)
+ case _ => AttributeReference(field.name, field.dataType,
field.nullable)()
+ }
+ }
+ }
+
+ /**
+ * Append assignments for newly-added columns and re-tag the action. Star
clauses pull values from
+ * source by name; explicit clauses fill NULL.
+ */
+ private def expandAction(
+ newAttrs: Seq[AttributeReference],
+ sourceOutput: Seq[Attribute],
+ resolver: Resolver)(action: MergeAction): MergeAction = {
+ val fromStar = PaimonMergeActionTags.isFromStar(action)
+ val newAssignments = newAttrs.map {
+ attr =>
+ val value: Expression = if (fromStar) {
+ sourceOutput
+ .find(s => resolver(s.name, attr.name))
+ .map(s => castIfNeeded(s, attr.dataType))
+ .getOrElse(Literal(null, attr.dataType))
+ } else {
+ Literal(null, attr.dataType)
+ }
+ Assignment(attr, value)
+ }
+ val expanded = action match {
+ case i: InsertAction =>
+ SparkShimLoader.shim.copyInsertAction(i, i.assignments ++
newAssignments)
+ case u: UpdateAction =>
+ SparkShimLoader.shim.copyUpdateAction(u, u.assignments ++
newAssignments)
+ case d: DeleteAction => d
+ }
+ PaimonMergeActionTags.carryFromStar(action, expanded)
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeActionTags.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeActionTags.scala
new file mode 100644
index 0000000000..04e283319e
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeActionTags.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.plans.logical.MergeAction
+import org.apache.spark.sql.catalyst.trees.TreeNodeTag
+
+/**
+ * Marks a `MergeAction` that originated from `INSERT *` / `UPDATE *`. After
resolution a star
+ * action is indistinguishable from an explicit clause that happens to list
every column, but
+ * merge-schema needs the original intent to decide between pulling new source
columns and filling
+ * NULL. Tags do not survive fresh constructor calls — rebuild sites must
re-tag via
+ * `carryFromStar`.
+ */
+object PaimonMergeActionTags {
+
+ val FROM_STAR: TreeNodeTag[Boolean] =
TreeNodeTag[Boolean]("paimon.merge.fromStar")
+
+ def isFromStar(action: MergeAction): Boolean =
+ action.getTagValue(FROM_STAR).contains(true)
+
+ def markFromStar[T <: MergeAction](action: T): T = {
+ action.setTagValue(FROM_STAR, true)
+ action
+ }
+
+ def carryFromStar[T <: MergeAction](source: MergeAction, target: T): T = {
+ if (isFromStar(source)) markFromStar(target) else target
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
index 759fde6a71..747b0477a5 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
@@ -33,78 +33,85 @@ trait PaimonMergeIntoBase
extends Rule[LogicalPlan]
with RowLevelHelper
with ExpressionHelper
- with AssignmentAlignmentHelper {
+ with AssignmentAlignmentHelper
+ with MergeSchemaEvolutionHelper {
val spark: SparkSession
override val operation: RowLevelOp = MergeInto
def apply(plan: LogicalPlan): LogicalPlan = {
- // Spark 4.1 moved RewriteMergeIntoTable from the "DML rewrite" batch into
the main Resolution
- // batch, which marks the plan analyzed before the Post-Hoc Resolution
batch runs.
- // `plan.resolveOperators` then short-circuits on the already-analyzed
MERGE node and the
- // physical planner rejects it with "Table does not support MERGE INTO
TABLE". Use
- // `transformDown` (which unconditionally visits every node) guarded by
- // `AnalysisHelper.allowInvokingTransformsInAnalyzer` so the in-analyzer
assertion does not
- // trip. Pure append-only Paimon tables on Spark 4.1+ are handled earlier
in the Resolution
- // batch by `Spark41MergeIntoRewrite`, so by the time this postHoc rule
fires the aligned
- // `MergeIntoTable` node is already a `ReplaceData` / `AppendData` plan
for that case; this
- // rule only sees MERGEs that still need to become the V1 command.
+ // Spark 4.1 marks the plan analyzed before postHoc runs, so
`transformDown` is needed to
+ // bypass `resolveOperators`'s short-circuit. Pure append-only tables on
4.1+ are handled
+ // earlier by `Spark41MergeIntoRewrite` and never reach here.
AnalysisHelper.allowInvokingTransformsInAnalyzer {
plan.transformDown {
case merge: MergeIntoTable
if merge.resolved &&
PaimonRelation.isPaimonTable(merge.targetTable) =>
val relation = PaimonRelation.getPaimonRelation(merge.targetTable)
- val v2Table = relation.table.asInstanceOf[SparkTable]
- val dataEvolutionEnabled = v2Table.coreOptions.dataEvolutionEnabled()
- val targetOutput = relation.output
+ var v2Table = relation.table.asInstanceOf[SparkTable]
checkPaimonTable(v2Table.getTable)
checkCondition(merge.mergeCondition)
- merge.matchedActions.flatMap(_.condition).foreach(checkCondition)
- merge.notMatchedActions.flatMap(_.condition).foreach(checkCondition)
+ (merge.matchedActions ++ merge.notMatchedActions)
+ .flatMap(_.condition)
+ .foreach(checkCondition)
- val updateActions = merge.matchedActions.collect { case a:
UpdateAction => a }
val primaryKeys = v2Table.getTable.primaryKeys().asScala.toSeq
if (primaryKeys.nonEmpty) {
+ val updateActions = merge.matchedActions.collect { case a:
UpdateAction => a }
checkUpdateActionValidity(
- AttributeSet(targetOutput),
+ AttributeSet(relation.output),
merge.mergeCondition,
updateActions,
primaryKeys)
}
- val alignedMergeIntoTable = alignMergeIntoTable(merge, targetOutput)
+ // Commit schema changes before alignment so the aligned plan sees
new columns.
+ val (resolvedMerge, targetOutput) =
+ evolveTargetIfNeeded(merge, relation, v2Table, spark,
resolveNotMatchedBySourceActions)
+ .map { case (m, r, t) => v2Table = t; (m, r.output) }
+ .getOrElse((merge, relation.output))
- if (!shouldFallbackToV1MergeInto(alignedMergeIntoTable)) {
- alignedMergeIntoTable
+ val aligned = alignMergeIntoTable(resolvedMerge, targetOutput)
+
+ if (!shouldFallbackToV1MergeInto(aligned)) {
+ aligned
} else {
- if (dataEvolutionEnabled) {
- MergeIntoPaimonDataEvolutionTable(
- v2Table,
- merge.targetTable,
- merge.sourceTable,
- merge.mergeCondition,
- alignedMergeIntoTable.matchedActions,
- alignedMergeIntoTable.notMatchedActions,
- resolveNotMatchedBySourceActions(alignedMergeIntoTable)
- )
- } else {
- MergeIntoPaimonTable(
- v2Table,
- merge.targetTable,
- merge.sourceTable,
- merge.mergeCondition,
- alignedMergeIntoTable.matchedActions,
- alignedMergeIntoTable.notMatchedActions,
- resolveNotMatchedBySourceActions(alignedMergeIntoTable)
- )
- }
+ buildV1Command(v2Table, resolvedMerge, aligned)
}
}
}
}
+ private def buildV1Command(
+ v2Table: SparkTable,
+ resolvedMerge: MergeIntoTable,
+ aligned: MergeIntoTable): LogicalPlan = {
+ val notMatchedBySource = resolveNotMatchedBySourceActions(aligned)
+ if (v2Table.coreOptions.dataEvolutionEnabled()) {
+ MergeIntoPaimonDataEvolutionTable(
+ v2Table,
+ resolvedMerge.targetTable,
+ resolvedMerge.sourceTable,
+ resolvedMerge.mergeCondition,
+ aligned.matchedActions,
+ aligned.notMatchedActions,
+ notMatchedBySource
+ )
+ } else {
+ MergeIntoPaimonTable(
+ v2Table,
+ resolvedMerge.targetTable,
+ resolvedMerge.sourceTable,
+ resolvedMerge.mergeCondition,
+ aligned.matchedActions,
+ aligned.notMatchedActions,
+ notMatchedBySource
+ )
+ }
+ }
+
private def checkCondition(condition: Expression): Unit = {
if (!condition.resolved) {
throw new RuntimeException(s"Condition $condition should have been
resolved.")
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala
index ba516ddbe8..4222c54d6e 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala
@@ -71,7 +71,8 @@ trait PaimonMergeIntoResolverBase extends ExpressionHelper {
}
val resolvedAssignments =
resolveAssignments(resolve, assignments, merge, SOURCE_ONLY)
- UpdateAction(resolvedCond, resolvedAssignments)
+ // Tag so merge-schema can distinguish `UPDATE *` from a fully-listed
explicit clause.
+ PaimonMergeActionTags.markFromStar(UpdateAction(resolvedCond,
resolvedAssignments))
case action =>
throw new RuntimeException(s"Can't recognize this action: $action")
}
@@ -97,7 +98,7 @@ trait PaimonMergeIntoResolverBase extends ExpressionHelper {
}
val resolvedAssignments =
resolveAssignments(resolve, assignments, merge, SOURCE_ONLY)
- InsertAction(resolvedCond, resolvedAssignments)
+ PaimonMergeActionTags.markFromStar(InsertAction(resolvedCond,
resolvedAssignments))
case action =>
throw new RuntimeException(s"Can't recognize this action: $action")
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
index db77b19ca0..1416602a5f 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
@@ -26,8 +26,8 @@ import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.types.RowType
import org.apache.spark.sql.{Column, DataFrame, PaimonUtils, SparkSession}
-import org.apache.spark.sql.functions.{col, lit, struct}
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.functions.{col, lit, struct, transform,
transform_values}
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField,
StructType}
import scala.collection.JavaConverters._
@@ -41,21 +41,66 @@ private[spark] trait SchemaHelper extends
WithFileStoreTable {
def mergeSchema(sparkSession: SparkSession, input: DataFrame, options:
Options): DataFrame = {
val dataSchema = SparkSystemColumns.filterSparkSystemColumns(input.schema)
- val newTableSchema = mergeSchema(input.schema, options)
- if (!PaimonUtils.sameType(newTableSchema, dataSchema)) {
+ val writeSchema = mergeSchema(dataSchema, options)
+ if (!PaimonUtils.sameType(writeSchema, dataSchema)) {
val resolve = sparkSession.sessionState.conf.resolver
- val cols = alignColumns(newTableSchema, dataSchema, resolve)
+ val cols = SchemaHelper.alignColumns(writeSchema, dataSchema, resolve)
input.select(cols: _*)
} else {
input
}
}
+ def mergeSchema(dataSchema: StructType, options: Options): StructType = {
+ val mergeSchemaEnabled =
+ options.get(SparkConnectorOptions.MERGE_SCHEMA) ||
OptionUtils.writeMergeSchemaEnabled()
+ if (!mergeSchemaEnabled) {
+ return dataSchema
+ }
+
+ val filteredDataSchema =
SparkSystemColumns.filterSparkSystemColumns(dataSchema)
+ val allowExplicitCast = options.get(SparkConnectorOptions.EXPLICIT_CAST)
|| OptionUtils
+ .writeMergeSchemaExplicitCastEnabled()
+ SchemaHelper.mergeAndCommitSchema(table, filteredDataSchema,
allowExplicitCast).foreach {
+ updatedTable => newTable = Some(updatedTable)
+ }
+
+ val writeSchema =
SparkTypeUtils.fromPaimonRowType(table.schema().logicalRowType())
+ if (!PaimonUtils.sameType(writeSchema, filteredDataSchema)) {
+ writeSchema
+ } else {
+ filteredDataSchema
+ }
+ }
+
+ def updateTableWithOptions(options: Map[String, String]): Unit = {
+ newTable = Some(table.copy(options.asJava))
+ }
+}
+
+private[spark] object SchemaHelper {
+
+ /**
+ * Merge the given dataSchema into the table's schema. If the schema
changed, commit the change
+ * and return the updated table; otherwise return None.
+ */
+ def mergeAndCommitSchema(
+ table: FileStoreTable,
+ dataSchema: StructType,
+ allowExplicitCast: Boolean): Option[FileStoreTable] = {
+ val dataRowType =
SparkTypeUtils.toPaimonType(dataSchema).asInstanceOf[RowType]
+ if (table.store().mergeSchema(dataRowType, allowExplicitCast)) {
+ Some(table.copyWithLatestSchema())
+ } else {
+ None
+ }
+ }
+
/**
* Recursively align columns from dataSchema to targetSchema by name. For
nested struct fields,
* reorder and fill nulls for missing sub-fields.
*/
- private def alignColumns(
+ def alignColumns(
targetSchema: StructType,
dataSchema: StructType,
resolve: (String, String) => Boolean): Seq[Column] = {
@@ -77,54 +122,38 @@ private[spark] trait SchemaHelper extends
WithFileStoreTable {
resolve: (String, String) => Boolean): Column = {
(sourceType, targetField.dataType) match {
case (s: StructType, t: StructType) if !PaimonUtils.sameType(s, t) =>
- val subCols = t.map {
- subTargetField =>
- s.find(f => resolve(f.name, subTargetField.name)) match {
- case Some(subDataField) =>
- alignColumn(
- sourceCol.getField(subDataField.name),
- subDataField.dataType,
- subTargetField,
- resolve)
- case _ =>
- lit(null).cast(subTargetField.dataType).as(subTargetField.name)
- }
- }
- struct(subCols: _*).as(targetField.name)
+ alignStruct(sourceCol, s, t, resolve).as(targetField.name)
+ case (ArrayType(s: StructType, _), ArrayType(t: StructType, _))
+ if !PaimonUtils.sameType(s, t) =>
+ transform(sourceCol, elem => alignStruct(elem, s, t, resolve))
+ .as(targetField.name)
+ case (MapType(sKey, sVal: StructType, _), MapType(tKey, tVal:
StructType, _))
+ if !PaimonUtils.sameType(sVal, tVal) =>
+ transform_values(sourceCol, (_, v) => alignStruct(v, sVal, tVal,
resolve))
+ .as(targetField.name)
case _ =>
sourceCol.as(targetField.name)
}
}
- def mergeSchema(dataSchema: StructType, options: Options): StructType = {
- val mergeSchemaEnabled =
- options.get(SparkConnectorOptions.MERGE_SCHEMA) ||
OptionUtils.writeMergeSchemaEnabled()
- if (!mergeSchemaEnabled) {
- return dataSchema
- }
-
- val filteredDataSchema =
SparkSystemColumns.filterSparkSystemColumns(dataSchema)
- val allowExplicitCast = options.get(SparkConnectorOptions.EXPLICIT_CAST)
|| OptionUtils
- .writeMergeSchemaExplicitCastEnabled()
- mergeAndCommitSchema(filteredDataSchema, allowExplicitCast)
-
- val writeSchema =
SparkTypeUtils.fromPaimonRowType(table.schema().logicalRowType())
-
- if (!PaimonUtils.sameType(writeSchema, filteredDataSchema)) {
- writeSchema
- } else {
- filteredDataSchema
- }
- }
-
- private def mergeAndCommitSchema(dataSchema: StructType, allowExplicitCast:
Boolean): Unit = {
- val dataRowType =
SparkTypeUtils.toPaimonType(dataSchema).asInstanceOf[RowType]
- if (table.store().mergeSchema(dataRowType, allowExplicitCast)) {
- newTable = Some(table.copyWithLatestSchema())
+ private def alignStruct(
+ sourceCol: Column,
+ sourceType: StructType,
+ targetType: StructType,
+ resolve: (String, String) => Boolean): Column = {
+ val subCols = targetType.map {
+ subTargetField =>
+ sourceType.find(f => resolve(f.name, subTargetField.name)) match {
+ case Some(subDataField) =>
+ alignColumn(
+ sourceCol.getField(subDataField.name),
+ subDataField.dataType,
+ subTargetField,
+ resolve)
+ case _ =>
+ lit(null).cast(subTargetField.dataType).as(subTargetField.name)
+ }
}
- }
-
- def updateTableWithOptions(options: Map[String, String]): Unit = {
- newTable = Some(table.copy(options.asJava))
+ struct(subCols: _*)
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
index 7a541a4514..3ceb494396 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
@@ -27,11 +27,12 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{CTERelationRef,
LogicalPlan, MergeAction, MergeIntoTable, SubqueryAlias, UnresolvedWith}
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment,
CTERelationRef, InsertAction, LogicalPlan, MergeAction, MergeIntoTable,
SubqueryAlias, UnresolvedWith, UpdateAction}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.ArrayData
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.StructType
import java.util.{Map => JMap}
@@ -88,6 +89,16 @@ trait SparkShim {
notMatchedBySourceActions: Seq[MergeAction],
withSchemaEvolution: Boolean): MergeIntoTable
+ def copyDataSourceV2Relation(
+ relation: DataSourceV2Relation,
+ table: Table,
+ output:
Seq[org.apache.spark.sql.catalyst.expressions.AttributeReference])
+ : DataSourceV2Relation
+
+ def copyUpdateAction(action: UpdateAction, assignments: Seq[Assignment]):
UpdateAction
+
+ def copyInsertAction(action: InsertAction, assignments: Seq[Assignment]):
InsertAction
+
/**
* Returns the list of "early" substitution rules Paimon needs to apply on a
parsed view plan.
* Spark 3.x exposes both `CTESubstitution` and
`SubstituteUnresolvedOrdinals`, but 4.1 removed
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoNotMatchedBySourceTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoNotMatchedBySourceTest.scala
index 2c46af0aed..d2c822789c 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoNotMatchedBySourceTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoNotMatchedBySourceTest.scala
@@ -183,4 +183,41 @@ trait MergeIntoNotMatchedBySourceTest extends
PaimonSparkTestBase with PaimonTab
)
}
}
+
+ test("Paimon MergeInto: merge-schema with not matched by source") {
+ withTable("source", "target") {
+ spark.conf.set("spark.paimon.write.merge-schema", "true")
+ try {
+ createTable("target", "a INT, b STRING", Seq("a"))
+ spark.sql("INSERT INTO target VALUES (1, 'v1'), (2, 'v2'), (3, 'v3')")
+
+ createTable("source", "a INT, b STRING, c INT", Seq("a"))
+ spark.sql("INSERT INTO source VALUES (1, 'u1', 10), (4, 'u4', 40)")
+
+ spark.sql("""
+ |MERGE INTO target
+ |USING source
+ |ON target.a = source.a
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |WHEN NOT MATCHED BY SOURCE AND a = 2 THEN
+ | UPDATE SET b = 'updated'
+ |WHEN NOT MATCHED BY SOURCE THEN
+ | DELETE
+ |""".stripMargin)
+
+ // id=1: matched, UPDATE SET * => (1, 'u1', 10)
+ // id=2: not matched by source, a=2, UPDATE SET b='updated' => (2,
'updated', null)
+ // id=3: not matched by source, DELETE => removed
+ // id=4: not matched, INSERT * => (4, 'u4', 40)
+ checkAnswer(
+ spark.sql("SELECT * FROM target ORDER BY a"),
+ Seq(Row(1, "u1", 10), Row(2, "updated", null), Row(4, "u4", 40)))
+ } finally {
+ spark.conf.unset("spark.paimon.write.merge-schema")
+ }
+ }
+ }
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
index 6c7ee12761..b00e911c07 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
@@ -783,6 +783,483 @@ abstract class MergeIntoTableTestBase extends
PaimonSparkTestBase with PaimonTab
checkAnswer(sql("SELECT * FROM target ORDER BY a, b"), Seq(Row(1,
"Eve"), Row(2, "Bob")))
}
}
+
+ test("Paimon MergeInto: merge-schema with explicit update columns") {
+ withTable("source", "target") {
+ withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+ createTable("target", "a INT, b STRING", Seq("a"))
+ spark.sql("INSERT INTO target VALUES (1, 'v1'), (2, 'v2')")
+
+ createTable("source", "a INT, b STRING, c INT", Seq("a"))
+ spark.sql("INSERT INTO source VALUES (1, 'u1', 10), (3, 'u3', 30)")
+
+ // No star action — schema must NOT evolve, c is never added.
+ spark.sql("""
+ |MERGE INTO target
+ |USING source
+ |ON target.a = source.a
+ |WHEN MATCHED THEN
+ |UPDATE SET b = source.b
+ |WHEN NOT MATCHED THEN
+ |INSERT (a, b) VALUES (source.a, source.b)
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql("SELECT * FROM target ORDER BY a"),
+ Seq(Row(1, "u1"), Row(2, "v2"), Row(3, "u3")))
+ }
+ }
+ }
+
+ test("Paimon MergeInto: merge-schema with partial explicit update only (no
star semantics)") {
+ withTable("source", "target") {
+ withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+ createTable("target", "a INT, b STRING", Seq("a"))
+ spark.sql("INSERT INTO target VALUES (1, 'v1'), (2, 'v2')")
+
+ createTable("source", "a INT, b STRING, c INT", Seq("a"))
+ spark.sql("INSERT INTO source VALUES (1, 'u1', 10), (3, 'u3', 30)")
+
+ // Single MATCHED clause without star — schema stays.
+ spark.sql("""
+ |MERGE INTO target
+ |USING source
+ |ON target.a = source.a
+ |WHEN MATCHED THEN
+ |UPDATE SET b = source.b
+ |""".stripMargin)
+
+ checkAnswer(spark.sql("SELECT * FROM target ORDER BY a"), Seq(Row(1,
"u1"), Row(2, "v2")))
+ }
+ }
+ }
+
+ test("Paimon MergeInto: merge-schema with star update and explicit insert") {
+ withTable("source", "target") {
+ withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+ createTable("target", "a INT, b STRING", Seq("a"))
+ spark.sql("INSERT INTO target VALUES (1, 'v1'), (2, 'v2')")
+
+ createTable("source", "a INT, b STRING, c INT", Seq("a"))
+ spark.sql("INSERT INTO source VALUES (1, 'u1', 10), (3, 'u3', 30)")
+
+ spark.sql("""
+ |MERGE INTO target
+ |USING source
+ |ON target.a = source.a
+ |WHEN MATCHED THEN
+ |UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ |INSERT (a, b) VALUES (source.a, source.b)
+ |""".stripMargin)
+
+ // UPDATE * evolves schema; explicit INSERT does not reference c, so
a=3 gets c=null.
+ checkAnswer(
+ spark.sql("SELECT * FROM target ORDER BY a"),
+ Seq(Row(1, "u1", 10), Row(2, "v2", null), Row(3, "u3", null)))
+ }
+ }
+ }
+
+ test("Paimon MergeInto: merge-schema with explicit update and star insert") {
+ withTable("source", "target") {
+ withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+ createTable("target", "a INT, b STRING", Seq("a"))
+ spark.sql("INSERT INTO target VALUES (1, 'v1'), (2, 'v2')")
+
+ createTable("source", "a INT, b STRING, c INT", Seq("a"))
+ spark.sql("INSERT INTO source VALUES (1, 'u1', 10), (3, 'u3', 30)")
+
+ spark.sql("""
+ |MERGE INTO target
+ |USING source
+ |ON target.a = source.a
+ |WHEN MATCHED THEN
+ |UPDATE SET b = source.b
+ |WHEN NOT MATCHED THEN
+ |INSERT *
+ |""".stripMargin)
+
+ // INSERT * evolves schema; explicit UPDATE only touches b, so a=1
keeps c=null.
+ checkAnswer(
+ spark.sql("SELECT * FROM target ORDER BY a"),
+ Seq(Row(1, "u1", null), Row(2, "v2", null), Row(3, "u3", 30)))
+ }
+ }
+ }
+
+ test("Paimon MergeInto: merge-schema with star update and insert") {
+ withTable("source", "target") {
+ withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+ createTable("target", "a INT, b STRING", Seq("a"))
+ spark.sql("INSERT INTO target VALUES (1, 'v1'), (2, 'v2')")
+
+ createTable("source", "a INT, b STRING, c INT", Seq("a"))
+ spark.sql("INSERT INTO source VALUES (1, 'u1', 10), (3, 'u3', 30)")
+
+ spark.sql("""
+ |MERGE INTO target
+ |USING source
+ |ON target.a = source.a
+ |WHEN MATCHED THEN
+ |UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ |INSERT *
+ |""".stripMargin)
+
+ // UPDATE SET * updates all columns including new column c from source.
+ checkAnswer(
+ spark.sql("SELECT * FROM target ORDER BY a"),
+ Seq(Row(1, "u1", 10), Row(2, "v2", null), Row(3, "u3", 30)))
+ }
+ }
+ }
+
+ test("Paimon MergeInto: merge-schema disabled should not add new columns") {
+ withTable("source", "target") {
+ withSparkSQLConf("spark.paimon.write.merge-schema" -> "false") {
+ createTable("target", "a INT, b STRING", Seq("a"))
+ spark.sql("INSERT INTO target VALUES (1, 'v1'), (2, 'v2')")
+
+ createTable("source", "a INT, b STRING, c INT", Seq("a"))
+ spark.sql("INSERT INTO source VALUES (1, 'u1', 10), (3, 'u3', 30)")
+
+ spark.sql("""
+ |MERGE INTO target
+ |USING source
+ |ON target.a = source.a
+ |WHEN MATCHED THEN
+ |UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ |INSERT *
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql("SELECT * FROM target ORDER BY a"),
+ Seq(Row(1, "u1"), Row(2, "v2"), Row(3, "u3")))
+ }
+ }
+ }
+
+ test("Paimon MergeInto: merge-schema with DV table") {
+ withTable("source", "target") {
+ withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+ createTable(
+ "target",
+ "a INT, b STRING",
+ Seq("a"),
+ extraProps = Map("deletion-vectors.enabled" -> "true"))
+ spark.sql("INSERT INTO target VALUES (1, 'v1'), (2, 'v2')")
+
+ createTable("source", "a INT, b STRING, c INT", Seq("a"))
+ spark.sql("INSERT INTO source VALUES (1, 'u1', 10), (3, 'u3', 30)")
+
+ spark.sql("""
+ |MERGE INTO target
+ |USING source
+ |ON target.a = source.a
+ |WHEN MATCHED THEN
+ |UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ |INSERT *
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql("SELECT * FROM target ORDER BY a"),
+ Seq(Row(1, "u1", 10), Row(2, "v2", null), Row(3, "u3", 30)))
+ }
+ }
+ }
+
+ test("Paimon MergeInto: merge-schema with nested struct new fields") {
+ withTable("source", "target") {
+ withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+ createTable("target", "a INT, info STRUCT<f1 STRING, f2 STRING>",
Seq("a"))
+ spark.sql("INSERT INTO target VALUES (1, struct('v1a', 'v2a')), (2,
struct('v1b', 'v2b'))")
+
+ createTable("source", "a INT, info STRUCT<f1 STRING, f2 STRING, f3
STRING>", Seq("a"))
+ spark.sql(
+ "INSERT INTO source VALUES (1, struct('u1a', 'u2a', 'u3a')), (3,
struct('u1c', 'u2c', 'u3c'))")
+
+ spark.sql("""
+ |MERGE INTO target
+ |USING source
+ |ON target.a = source.a
+ |WHEN MATCHED THEN
+ |UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ |INSERT *
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql("SELECT * FROM target ORDER BY a"),
+ Seq(
+ Row(1, Row("u1a", "u2a", "u3a")),
+ Row(2, Row("v1b", "v2b", null)),
+ Row(3, Row("u1c", "u2c", "u3c")))
+ )
+ }
+ }
+ }
+
+ test("Paimon MergeInto: merge-schema with new top-level and nested fields") {
+ withTable("source", "target") {
+ withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+ createTable("target", "a INT, info STRUCT<f1 STRING>", Seq("a"))
+ spark.sql("INSERT INTO target VALUES (1, struct('v1')), (2,
struct('v2'))")
+
+ createTable("source", "a INT, info STRUCT<f1 STRING, f2 STRING>, extra
STRING", Seq("a"))
+ spark.sql(
+ "INSERT INTO source VALUES (1, struct('u1', 'new1'), 'e1'), (3,
struct('u3', 'new3'), 'e3')")
+
+ spark.sql("""
+ |MERGE INTO target
+ |USING source
+ |ON target.a = source.a
+ |WHEN MATCHED THEN
+ |UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ |INSERT *
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql("SELECT * FROM target ORDER BY a"),
+ Seq(
+ Row(1, Row("u1", "new1"), "e1"),
+ Row(2, Row("v2", null), null),
+ Row(3, Row("u3", "new3"), "e3"))
+ )
+ }
+ }
+ }
+
+ test("Paimon MergeInto: merge-schema complex scenario with multiple clauses
and conditions") {
+ withTable("source", "target") {
+ withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+ createTable("target", "id INT, name STRING, score INT", Seq("id"))
+ spark.sql(
+ "INSERT INTO target VALUES (1, 'Alice', 80), (2, 'Bob', 70), (3,
'Carol', 60), (4, 'Dave', 50)")
+
+ createTable(
+ "source",
+ "id INT, name STRING, score INT, grade STRING, info STRUCT<city
STRING, zip STRING>",
+ Seq("id"))
+ spark.sql("""INSERT INTO source VALUES
+ |(1, 'Alice_v2', 95, 'A', struct('Beijing', '100000')),
+ |(2, 'Bob_v2', 55, 'C', struct('Shanghai', '200000')),
+ |(5, 'Eve', 88, 'B', struct('Hangzhou', '310000')),
+ |(6, 'Frank', 42, 'D', struct('Nanjing',
'210000'))""".stripMargin)
+
+ spark.sql("""
+ |MERGE INTO target
+ |USING source
+ |ON target.id = source.id
+ |WHEN MATCHED AND source.score >= 60 THEN
+ | UPDATE SET *
+ |WHEN MATCHED AND source.score < 60 THEN
+ | DELETE
+ |WHEN NOT MATCHED AND source.score >= 50 THEN
+ | INSERT *
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql("SELECT * FROM target ORDER BY id"),
+ Seq(
+ Row(1, "Alice_v2", 95, "A", Row("Beijing", "100000")),
+ Row(3, "Carol", 60, null, null),
+ Row(4, "Dave", 50, null, null),
+ Row(5, "Eve", 88, "B", Row("Hangzhou", "310000"))
+ )
+ )
+ }
+ }
+ }
+
+ test("Paimon MergeInto: struct field reorder by name without merge-schema") {
+ withTable("source", "target") {
+ createTable(
+ "target",
+ "id INT, name STRING, address STRUCT<city STRING, zip STRING>",
+ Seq("id"))
+ spark.sql("""INSERT INTO target VALUES
+ |(1, 'Alice', struct('Shanghai', '200000')),
+ |(2, 'Bob', struct('Beijing', '100000'))""".stripMargin)
+
+ // Source struct has sub-fields in reversed order — alignment must be by
name.
+ createTable(
+ "source",
+ "id INT, name STRING, address STRUCT<zip STRING, city STRING>",
+ Seq("id"))
+ spark.sql("""INSERT INTO source VALUES
+ |(1, 'Alice_v2', struct('210000', 'Nanjing')),
+ |(3, 'Carol', struct('310000', 'Hangzhou'))""".stripMargin)
+
+ spark.sql("""
+ |MERGE INTO target
+ |USING source
+ |ON target.id = source.id
+ |WHEN MATCHED THEN
+ |UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ |INSERT *
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql("SELECT id, name, address FROM target ORDER BY id"),
+ Seq(
+ Row(1, "Alice_v2", Row("Nanjing", "210000")),
+ Row(2, "Bob", Row("Beijing", "100000")),
+ Row(3, "Carol", Row("Hangzhou", "310000"))
+ )
+ )
+ }
+ }
+
+ test("Paimon MergeInto: merge-schema with struct field change") {
+ withTable("source", "target") {
+ withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+ createTable(
+ "target",
+ "id INT, name STRING, address STRUCT<city STRING, zip STRING>",
+ Seq("id"))
+ spark.sql("""INSERT INTO target VALUES
+ |(1, 'Alice', struct('Shanghai', '200000')),
+ |(2, 'Bob', struct('Beijing', '100000'))""".stripMargin)
+
+ // Source struct: reversed order (zip, city) + new sub-field 'country'
+ new top col 'extra'.
+ createTable(
+ "source",
+ "id INT, name STRING, address STRUCT<zip STRING, city STRING,
country STRING>, extra STRING",
+ Seq("id"))
+ spark.sql("""INSERT INTO source VALUES
+ |(1, 'Alice_v2', struct('210000', 'Nanjing', 'CN'), 'e1'),
+ |(3, 'Carol', struct('310000', 'Hangzhou', 'CN'),
'e3')""".stripMargin)
+
+ spark.sql("""
+ |MERGE INTO target
+ |USING source
+ |ON target.id = source.id
+ |WHEN MATCHED THEN
+ |UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ |INSERT *
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql("SELECT id, name, address, extra FROM target ORDER BY id"),
+ Seq(
+ Row(1, "Alice_v2", Row("Nanjing", "210000", "CN"), "e1"),
+ Row(2, "Bob", Row("Beijing", "100000", null), null),
+ Row(3, "Carol", Row("Hangzhou", "310000", "CN"), "e3")
+ )
+ )
+ }
+ }
+ }
+
+ test("Paimon MergeInto: merge-schema with array of struct field reorder and
new top-level col") {
+ withTable("source", "target") {
+ withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+ createTable(
+ "target",
+ "id INT, name STRING, items ARRAY<STRUCT<code STRING, quantity
INT>>",
+ Seq("id"))
+ spark.sql("""INSERT INTO target VALUES
+ |(1, 'Alice', array(struct('A01', 10), struct('A02', 20))),
+ |(2, 'Bob', array(struct('B01', 30)))""".stripMargin)
+
+ // Source array<struct>: reversed sub-field order (quantity, code) +
new top col 'extra'.
+ createTable(
+ "source",
+ "id INT, name STRING, items ARRAY<STRUCT<quantity INT, code
STRING>>, extra STRING",
+ Seq("id"))
+ spark.sql("""INSERT INTO source VALUES
+ |(1, 'Alice_v2', array(struct(15, 'A01')), 'e1'),
+ |(3, 'Carol', array(struct(5, 'C01'), struct(8, 'C02')),
'e3')""".stripMargin)
+
+ spark.sql("""
+ |MERGE INTO target
+ |USING source
+ |ON target.id = source.id
+ |WHEN MATCHED THEN
+ |UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ |INSERT *
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql("SELECT id, name, items, extra FROM target ORDER BY id"),
+ Seq(
+ Row(1, "Alice_v2", Seq(Row("A01", 15)), "e1"),
+ Row(2, "Bob", Seq(Row("B01", 30)), null),
+ Row(3, "Carol", Seq(Row("C01", 5), Row("C02", 8)), "e3")
+ )
+ )
+ }
+ }
+ }
+
+ test("Paimon MergeInto: struct field reorder when target has fields absent
from source") {
+ withTable("source", "target") {
+ // Target struct has 3 sub-fields; source struct only has 2 of them in a
different order.
+ createTable("target", "id INT, info STRUCT<x INT, y INT, z INT>",
Seq("id"))
+ spark.sql("INSERT INTO target VALUES (1, struct(1, 2, 3)), (2, struct(4,
5, 6))")
+
+ createTable("source", "id INT, info STRUCT<y INT, x INT>", Seq("id"))
+ spark.sql("INSERT INTO source VALUES (1, struct(20, 10)), (3, struct(80,
70))")
+
+ spark.sql("""
+ |MERGE INTO target
+ |USING source
+ |ON target.id = source.id
+ |WHEN MATCHED THEN
+ |UPDATE SET target.info = source.info
+ |WHEN NOT MATCHED THEN
+ |INSERT (id, info) VALUES (source.id, source.info)
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql("SELECT * FROM target ORDER BY id"),
+ Seq(
+ Row(1, Row(10, 20, null)),
+ Row(2, Row(4, 5, 6)),
+ Row(3, Row(70, 80, null))
+ )
+ )
+ }
+ }
+
+ test("Paimon MergeInto: map with struct value field reorder") {
+ withTable("source", "target") {
+ createTable("target", "id INT, props MAP<STRING, STRUCT<a INT, b
STRING>>", Seq("id"))
+ spark.sql(
+ "INSERT INTO target VALUES (1, map('k1', struct(1, 'v1'))), (2,
map('k2', struct(2, 'v2')))")
+
+ // Source map value struct has reversed sub-field order (b, a).
+ createTable("source", "id INT, props MAP<STRING, STRUCT<b STRING, a
INT>>", Seq("id"))
+ spark.sql(
+ "INSERT INTO source VALUES (1, map('k1', struct('u1', 10))), (3,
map('k3', struct('u3', 30)))")
+
+ spark.sql("""
+ |MERGE INTO target
+ |USING source
+ |ON target.id = source.id
+ |WHEN MATCHED THEN
+ |UPDATE SET target.props = source.props
+ |WHEN NOT MATCHED THEN
+ |INSERT (id, props) VALUES (source.id, source.props)
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql("SELECT * FROM target ORDER BY id"),
+ Seq(
+ Row(1, Map("k1" -> Row(10, "u1"))),
+ Row(2, Map("k2" -> Row(2, "v2"))),
+ Row(3, Map("k3" -> Row(30, "u3")))
+ )
+ )
+ }
+ }
}
trait MergeIntoPrimaryKeyTableTest extends PaimonSparkTestBase with
PaimonPrimaryKeyTable {
@@ -829,6 +1306,8 @@ trait MergeIntoPrimaryKeyTableTest extends
PaimonSparkTestBase with PaimonPrimar
trait MergeIntoAppendTableTest extends PaimonSparkTestBase with
PaimonAppendTable {
+ import testImplicits._
+
test("Paimon MergeInto: non pk table commit kind") {
withTable("s", "t") {
createTable("s", "id INT, b INT, c INT", Seq("id"))
@@ -970,6 +1449,37 @@ trait MergeIntoAppendTableTest extends
PaimonSparkTestBase with PaimonAppendTabl
}
}
+ test("Paimon MergeInto: type cast STRING to INT in update and insert") {
+ withTable("target") {
+ createTable("target", "id INT, name STRING, status_id INT", Seq("id"))
+ spark.sql("INSERT INTO target VALUES (1, 'Alice', 100), (2, 'Bob', 200),
(3, 'Charlie', 300)")
+
+ Seq((1, "Alice_Updated", "111"), (2, "Bob_Updated", "222"), (4,
"David_New", "400"))
+ .toDF("id", "name", "status_id")
+ .createTempView("source")
+ try {
+ spark.sql(
+ """
+ |MERGE INTO target
+ |USING source
+ |ON target.id = source.id
+ |WHEN MATCHED THEN
+ | UPDATE SET target.id = source.id, target.name = source.name,
target.status_id = source.status_id
+ |WHEN NOT MATCHED THEN
+ | INSERT (id, name, status_id) VALUES (source.id, source.name,
source.status_id)
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql("SELECT * FROM target ORDER BY id"),
+ Row(1, "Alice_Updated", 111) :: Row(2, "Bob_Updated", 222) ::
+ Row(3, "Charlie", 300) :: Row(4, "David_New", 400) :: Nil
+ )
+ } finally {
+ spark.catalog.dropTempView("source")
+ }
+ }
+ }
+
def createPositiveRandomInt(): Int = {
val random = new Random()
val positiveInt = random.nextInt()
diff --git
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
index 5f989f525d..c71b3df923 100644
---
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
+++
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
@@ -29,10 +29,10 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{CTESubstitution,
SubstituteUnresolvedOrdinals}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CTERelationRef,
LogicalPlan, MergeAction, MergeIntoTable, SubqueryAlias, UnresolvedWith}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment,
CTERelationRef, InsertAction, LogicalPlan, MergeAction, MergeIntoTable,
SubqueryAlias, UnresolvedWith, UpdateAction}
// NOTE: `MergeRows` / `MergeRows.Keep` were introduced in Spark 3.4. We
access them only via
// reflection inside the `mergeRowsKeep*` method bodies so that loading
`Spark3Shim` does not fail
// on Spark 3.2 / 3.3 runtimes that still ship `paimon-spark3-common` (the
module targets 3.5.8 at
@@ -43,6 +43,7 @@ import org.apache.spark.sql.connector.catalog.{Identifier,
Table, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.SparkFormatTable
import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex,
PartitionSpec}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.streaming.{FileStreamSink,
MetadataLogFileIndex}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
@@ -119,6 +120,25 @@ class Spark3Shim extends SparkShim {
notMatchedBySourceActions)
}
+ override def copyDataSourceV2Relation(
+ relation: DataSourceV2Relation,
+ table: Table,
+ output: Seq[AttributeReference]): DataSourceV2Relation = {
+ relation.copy(table = table, output = output)
+ }
+
+ override def copyUpdateAction(
+ action: UpdateAction,
+ assignments: Seq[Assignment]): UpdateAction = {
+ action.copy(assignments = assignments)
+ }
+
+ override def copyInsertAction(
+ action: InsertAction,
+ assignments: Seq[Assignment]): InsertAction = {
+ action.copy(assignments = assignments)
+ }
+
override def earlyBatchRules(): Seq[Rule[LogicalPlan]] =
Seq(CTESubstitution, SubstituteUnresolvedOrdinals)
diff --git
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41MergeIntoRewrite.scala
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41MergeIntoRewrite.scala
index 6b50691717..921092572e 100644
---
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41MergeIntoRewrite.scala
+++
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41MergeIntoRewrite.scala
@@ -18,9 +18,10 @@
package org.apache.spark.sql.catalyst.analysis
-import org.apache.paimon.spark.catalyst.analysis.AssignmentAlignmentHelper
+import org.apache.paimon.spark.SparkTable
+import org.apache.paimon.spark.catalyst.analysis.{AssignmentAlignmentHelper,
MergeSchemaEvolutionHelper, PaimonRelation}
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute,
AttributeReference, Exists, Expression, IsNotNull, Literal, MetadataAttribute,
MonotonicallyIncreasingID, OuterReference, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral,
TrueLiteral}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
@@ -80,6 +81,7 @@ object Spark41MergeIntoRewrite
extends RewriteRowLevelCommand
with PredicateHelper
with AssignmentAlignmentHelper
+ with MergeSchemaEvolutionHelper
with PureAppendOnlyScope {
final private val ROW_FROM_SOURCE = "__row_from_source"
@@ -92,11 +94,21 @@ object Spark41MergeIntoRewrite
case m: MergeIntoTable
if m.resolved && m.rewritable && !m.needSchemaEvolution &&
targetsPureAppendOnly(m.targetTable) =>
- rewrite(alignMergeIntoTable(m))
+ // Pure append-only tables never reach the postHoc
`PaimonMergeIntoBase`, so evolve here.
+ rewrite(alignMergeIntoTable(evolveSchemaIfPaimon(m)))
}
}
}
+ private def evolveSchemaIfPaimon(m: MergeIntoTable): MergeIntoTable = {
+ if (!PaimonRelation.isPaimonTable(m.targetTable)) return m
+ val relation = PaimonRelation.getPaimonRelation(m.targetTable)
+ val v2Table = relation.table.asInstanceOf[SparkTable]
+ evolveTargetIfNeeded(m, relation, v2Table, SparkSession.active,
_.notMatchedBySourceActions)
+ .map(_._1)
+ .getOrElse(m)
+ }
+
/*
-------------------------------------------------------------------------------------------
*
* Dispatcher mirroring `RewriteMergeIntoTable.apply`'s three
`ReplaceData`/`AppendData`
* branches. The `SupportsDelta` branch from Spark is intentionally omitted.
diff --git
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
index 01b6d8d79b..b66b896927 100644
---
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
+++
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
@@ -29,10 +29,10 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.CTESubstitution
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CTERelationRef,
LogicalPlan, MergeAction, MergeIntoTable, MergeRows, SubqueryAlias,
UnresolvedWith}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment,
CTERelationRef, InsertAction, LogicalPlan, MergeAction, MergeIntoTable,
MergeRows, SubqueryAlias, UnresolvedWith, UpdateAction}
import org.apache.spark.sql.catalyst.plans.logical.MergeRows.{Copy, Insert,
Keep, Update}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.ArrayData
@@ -40,6 +40,7 @@ import org.apache.spark.sql.connector.catalog.{CatalogV2Util,
Identifier, Table,
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.SparkFormatTable
import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex,
PartitionSpec}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.streaming.runtime.MetadataLogFileIndex
import org.apache.spark.sql.execution.streaming.sinks.FileStreamSink
import org.apache.spark.sql.internal.SQLConf
@@ -122,6 +123,25 @@ class Spark4Shim extends SparkShim {
withSchemaEvolution)
}
+ override def copyDataSourceV2Relation(
+ relation: DataSourceV2Relation,
+ table: Table,
+ output: Seq[AttributeReference]): DataSourceV2Relation = {
+ relation.copy(table = table, output = output)
+ }
+
+ override def copyUpdateAction(
+ action: UpdateAction,
+ assignments: Seq[Assignment]): UpdateAction = {
+ action.copy(assignments = assignments)
+ }
+
+ override def copyInsertAction(
+ action: InsertAction,
+ assignments: Seq[Assignment]): InsertAction = {
+ action.copy(assignments = assignments)
+ }
+
override def earlyBatchRules(): Seq[Rule[LogicalPlan]] = Seq(CTESubstitution)
override def mergeRowsKeepCopy(condition: Expression, output:
Seq[Expression]): AnyRef =