This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 35bd94c60d [spark] Support partial insert for data evolution merge into (#6149) 35bd94c60d is described below commit 35bd94c60d819caf862fa27d069cb4b5399f23e3 Author: YeJunHao <41894543+leaves12...@users.noreply.github.com> AuthorDate: Tue Aug 26 14:45:00 2025 +0800 [spark] Support partial insert for data evolution merge into (#6149) --- .../catalyst/analysis/PaimonMergeIntoBase.scala | 3 -- .../MergeIntoPaimonDataEvolutionTable.scala | 11 +++++-- .../paimon/spark/sql/RowLineageTestBase.scala | 37 ++++++++++++++++++++++ 3 files changed, 46 insertions(+), 5 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala index ea69d32751..8a52273eea 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala @@ -110,9 +110,6 @@ trait PaimonMergeIntoBase u.copy(assignments = alignAssignments(targetOutput, assignments)) case i @ InsertAction(_, assignments) => - if (assignments.length != targetOutput.length && dataEvolutionEnabled) { - throw new RuntimeException("Can't align the table's columns in insert clause.") - } i.copy(assignments = alignAssignments(targetOutput, assignments)) case _: UpdateStarAction => diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala index b5aa82f148..d26426d6c4 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala @@ -31,7 +31,7 @@ import org.apache.paimon.table.source.DataSplit import org.apache.spark.sql.{Dataset, Row, SparkSession} import org.apache.spark.sql.PaimonUtils._ import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.resolver -import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression} +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, Literal} import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral} import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftOuter} import org.apache.spark.sql.catalyst.plans.logical._ @@ -258,7 +258,14 @@ case class MergeIntoPaimonDataEvolutionTable( case insertAction: InsertAction => Keep( insertAction.condition.getOrElse(TrueLiteral), - insertAction.assignments.map(a => a.value)) + insertAction.assignments.map( + a => + if ( + !a.value.isInstanceOf[AttributeReference] || joinPlan.output.exists( + attr => attr.toString().equals(a.value.toString())) + ) a.value + else Literal(null)) + ) }.toSeq, notMatchedBySourceInstructions = Nil, checkCardinality = false, diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala index da43a2e91b..d8a3516a80 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala @@ -221,6 +221,43 @@ abstract class RowLineageTestBase extends PaimonSparkTestBase { } } + test("Data Evolution: insert into table with data-evolution partial insert") { + withTable("s", "t") { + sql("CREATE TABLE s (id INT, b INT)") + sql("INSERT INTO s VALUES (1, 11), (2, 22)") + + sql( + "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')") + sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c FROM range(2, 4)") + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN NOT MATCHED THEN INSERT (id, b) VALUES (-1, b) + |""".stripMargin) + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN NOT MATCHED THEN INSERT (b) VALUES (b) + |""".stripMargin) + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN NOT MATCHED THEN INSERT (id, c) VALUES (3, 4) + |""".stripMargin) + + checkAnswer( + sql("SELECT * FROM t ORDER BY id"), + Seq(Row(null, 11, null), Row(-1, 11, null), Row(2, 2, 2), Row(3, 3, 3), Row(3, null, 4)) + ) + } + } + test("Data Evolution: merge into table with data-evolution") { withTable("s", "t") { sql("CREATE TABLE s (id INT, b INT)")