This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 35bd94c60d [spark] Support partial insert for data evolution merge 
into (#6149)
35bd94c60d is described below

commit 35bd94c60d819caf862fa27d069cb4b5399f23e3
Author: YeJunHao <41894543+leaves12...@users.noreply.github.com>
AuthorDate: Tue Aug 26 14:45:00 2025 +0800

    [spark] Support partial insert for data evolution merge into (#6149)
---
 .../catalyst/analysis/PaimonMergeIntoBase.scala    |  3 --
 .../MergeIntoPaimonDataEvolutionTable.scala        | 11 +++++--
 .../paimon/spark/sql/RowLineageTestBase.scala      | 37 ++++++++++++++++++++++
 3 files changed, 46 insertions(+), 5 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
index ea69d32751..8a52273eea 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
@@ -110,9 +110,6 @@ trait PaimonMergeIntoBase
         u.copy(assignments = alignAssignments(targetOutput, assignments))
 
       case i @ InsertAction(_, assignments) =>
-        if (assignments.length != targetOutput.length && dataEvolutionEnabled) 
{
-          throw new RuntimeException("Can't align the table's columns in 
insert clause.")
-        }
         i.copy(assignments = alignAssignments(targetOutput, assignments))
 
       case _: UpdateStarAction =>
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
index b5aa82f148..d26426d6c4 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
@@ -31,7 +31,7 @@ import org.apache.paimon.table.source.DataSplit
 import org.apache.spark.sql.{Dataset, Row, SparkSession}
 import org.apache.spark.sql.PaimonUtils._
 import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.resolver
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Expression, Literal}
 import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, 
TrueLiteral}
 import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftOuter}
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -258,7 +258,14 @@ case class MergeIntoPaimonDataEvolutionTable(
         case insertAction: InsertAction =>
           Keep(
             insertAction.condition.getOrElse(TrueLiteral),
-            insertAction.assignments.map(a => a.value))
+            insertAction.assignments.map(
+              a =>
+                if (
+                  !a.value.isInstanceOf[AttributeReference] || 
joinPlan.output.exists(
+                    attr => attr.toString().equals(a.value.toString()))
+                ) a.value
+                else Literal(null))
+          )
       }.toSeq,
       notMatchedBySourceInstructions = Nil,
       checkCardinality = false,
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
index da43a2e91b..d8a3516a80 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTestBase.scala
@@ -221,6 +221,43 @@ abstract class RowLineageTestBase extends 
PaimonSparkTestBase {
     }
   }
 
+  test("Data Evolution: insert into table with data-evolution partial insert") 
{
+    withTable("s", "t") {
+      sql("CREATE TABLE s (id INT, b INT)")
+      sql("INSERT INTO s VALUES (1, 11), (2, 22)")
+
+      sql(
+        "CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
+      sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS b, id AS c 
FROM range(2, 4)")
+
+      sql("""
+            |MERGE INTO t
+            |USING s
+            |ON t.id = s.id
+            |WHEN NOT MATCHED THEN INSERT (id, b) VALUES (-1, b)
+            |""".stripMargin)
+
+      sql("""
+            |MERGE INTO t
+            |USING s
+            |ON t.id = s.id
+            |WHEN NOT MATCHED THEN INSERT (b) VALUES (b)
+            |""".stripMargin)
+
+      sql("""
+            |MERGE INTO t
+            |USING s
+            |ON t.id = s.id
+            |WHEN NOT MATCHED THEN INSERT (id, c) VALUES (3, 4)
+            |""".stripMargin)
+
+      checkAnswer(
+        sql("SELECT * FROM t ORDER BY id"),
+        Seq(Row(null, 11, null), Row(-1, 11, null), Row(2, 2, 2), Row(3, 3, 
3), Row(3, null, 4))
+      )
+    }
+  }
+
   test("Data Evolution: merge into table with data-evolution") {
     withTable("s", "t") {
       sql("CREATE TABLE s (id INT, b INT)")

Reply via email to