This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1eae414989 [spark] Add Spark procedure for branch merge (#7930)
1eae414989 is described below

commit 1eae414989b948551e7454178d810f2ac85db4a1
Author: Junrui Lee <[email protected]>
AuthorDate: Fri May 22 20:20:47 2026 +0800

    [spark] Add Spark procedure for branch merge (#7930)
---
 docs/docs/spark/procedures.md                      |  15 +++
 .../org/apache/paimon/spark/SparkProcedures.java   |   2 +
 .../spark/procedure/MergeBranchProcedure.java      | 105 ++++++++++++++++++++
 .../spark/procedure/MergeBranchProcedureTest.scala | 110 +++++++++++++++++++++
 4 files changed, 232 insertions(+)

diff --git a/docs/docs/spark/procedures.md b/docs/docs/spark/procedures.md
index cdd157d6bb..008c892a37 100644
--- a/docs/docs/spark/procedures.md
+++ b/docs/docs/spark/procedures.md
@@ -360,6 +360,21 @@ This section introduce all available spark procedures 
about paimon.
           CALL sys.fast_forward(table => 'test_db.T', branch => 'test_branch')
       </td>
     </tr>
+   <tr>
+      <td>merge_branch</td>
+      <td>
+         Merge data files from source branch into target branch for 
append-only tables.
+         The table must be created with <code>'branch-merge.enabled' = 
'true'</code>. This option enforces a pure-append table history by rejecting 
compaction and INSERT OVERWRITE, and it is incompatible with deletion vectors.
+         Requires compatible schema history and consistent row-tracking 
settings between source and target. Arguments:
+            <li>table: the table identifier. Cannot be empty.</li>
+            <li>source_branch: name of the source branch to merge from. Cannot 
be empty.</li>
+            <li>target_branch(optional): name of the target branch to merge 
into. Default is 'main'.</li>
+      </td>
+      <td>
+         CALL sys.merge_branch(table => 'test_db.T', source_branch => 
'branch1')<br/><br/>
+         CALL sys.merge_branch(table => 'test_db.T', source_branch => 
'branch1', target_branch => 'branch2')
+      </td>
+    </tr>
    <tr>
       <td>reset_consumer</td>
       <td>
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index f27478bbbd..ad0f2a93b7 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -39,6 +39,7 @@ import 
org.apache.paimon.spark.procedure.ExpireSnapshotsProcedure;
 import org.apache.paimon.spark.procedure.ExpireTagsProcedure;
 import org.apache.paimon.spark.procedure.FastForwardProcedure;
 import org.apache.paimon.spark.procedure.MarkPartitionDoneProcedure;
+import org.apache.paimon.spark.procedure.MergeBranchProcedure;
 import org.apache.paimon.spark.procedure.MigrateDatabaseProcedure;
 import org.apache.paimon.spark.procedure.MigrateTableProcedure;
 import org.apache.paimon.spark.procedure.Procedure;
@@ -111,6 +112,7 @@ public class SparkProcedures {
         procedureBuilders.put("expire_partitions", 
ExpirePartitionsProcedure::builder);
         procedureBuilders.put("repair", RepairProcedure::builder);
         procedureBuilders.put("fast_forward", FastForwardProcedure::builder);
+        procedureBuilders.put("merge_branch", MergeBranchProcedure::builder);
         procedureBuilders.put("reset_consumer", 
ResetConsumerProcedure::builder);
         procedureBuilders.put("mark_partition_done", 
MarkPartitionDoneProcedure::builder);
         procedureBuilders.put("compact_manifest", 
CompactManifestProcedure::builder);
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MergeBranchProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MergeBranchProcedure.java
new file mode 100644
index 0000000000..dc4eadfed9
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MergeBranchProcedure.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import static org.apache.paimon.catalog.Identifier.DEFAULT_MAIN_BRANCH;
+import static org.apache.paimon.catalog.Identifier.SYSTEM_BRANCH_PREFIX;
+import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Procedure for merging branches. Usage:
+ *
+ * <pre><code>
+ *  CALL sys.merge_branch('tableId', 'sourceBranch', 'targetBranch')
+ * </code></pre>
+ */
+public class MergeBranchProcedure extends BaseProcedure {
+
+    private static final ProcedureParameter[] PARAMETERS =
+            new ProcedureParameter[] {
+                ProcedureParameter.required("table", StringType),
+                ProcedureParameter.required("source_branch", StringType),
+                ProcedureParameter.optional("target_branch", StringType)
+            };
+
+    private static final StructType OUTPUT_TYPE =
+            new StructType(
+                    new StructField[] {
+                        new StructField("result", DataTypes.BooleanType, true, 
Metadata.empty())
+                    });
+
+    protected MergeBranchProcedure(TableCatalog tableCatalog) {
+        super(tableCatalog);
+    }
+
+    @Override
+    public ProcedureParameter[] parameters() {
+        return PARAMETERS;
+    }
+
+    @Override
+    public StructType outputType() {
+        return OUTPUT_TYPE;
+    }
+
+    @Override
+    public InternalRow[] call(InternalRow args) {
+        Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
+        String sourceBranch = args.getString(1);
+        String targetBranch = args.isNullAt(2) ? DEFAULT_MAIN_BRANCH : 
args.getString(2);
+        InternalRow[] result =
+                modifyPaimonTable(
+                        tableIdent,
+                        table -> {
+                            table.mergeBranch(sourceBranch, targetBranch);
+                            InternalRow outputRow = newInternalRow(true);
+                            return new InternalRow[] {outputRow};
+                        });
+        if (!DEFAULT_MAIN_BRANCH.equals(targetBranch)) {
+            String branchTableName =
+                    tableIdent.name() + SYSTEM_TABLE_SPLITTER + 
SYSTEM_BRANCH_PREFIX + targetBranch;
+            Identifier branchIdent = Identifier.of(tableIdent.namespace(), 
branchTableName);
+            refreshSparkCache(branchIdent, loadSparkTable(branchIdent));
+        }
+        return result;
+    }
+
+    public static ProcedureBuilder builder() {
+        return new BaseProcedure.Builder<MergeBranchProcedure>() {
+            @Override
+            public MergeBranchProcedure doBuild() {
+                return new MergeBranchProcedure(tableCatalog());
+            }
+        };
+    }
+
+    @Override
+    public String description() {
+        return "MergeBranchProcedure";
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MergeBranchProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MergeBranchProcedureTest.scala
new file mode 100644
index 0000000000..f3ab4bcf18
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MergeBranchProcedureTest.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+
+class MergeBranchProcedureTest extends PaimonSparkTestBase {
+
+  test("Paimon procedure: merge branch test") {
+    spark.sql(s"""
+                 |CREATE TABLE T (id INT, name STRING)
+                 |USING PAIMON
+                 |TBLPROPERTIES ('branch-merge.enabled' = 'true')
+                 |""".stripMargin)
+
+    spark.sql(s"INSERT INTO T VALUES (1, 'a')")
+
+    spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => 'tag1')")
+
+    checkAnswer(
+      spark.sql(
+        "CALL paimon.sys.create_branch(table => 'test.T', branch => 
'test_branch', tag => 'tag1')"),
+      Row(true) :: Nil)
+
+    spark.sql(s"INSERT INTO `T$$branch_test_branch` VALUES (2, 'b')")
+
+    checkAnswer(
+      spark.sql("CALL paimon.sys.merge_branch(table => 'test.T', source_branch 
=> 'test_branch')"),
+      Row(true) :: Nil)
+
+    checkAnswer(spark.sql("SELECT * FROM T ORDER BY id"), Row(1, "a") :: 
Row(2, "b") :: Nil)
+  }
+
+  test("Paimon procedure: merge branch on primary key table should fail") {
+    spark.sql(s"""
+                 |CREATE TABLE T (id INT, name STRING)
+                 |USING PAIMON
+                 |TBLPROPERTIES ('primary-key'='id', 'bucket'='1')
+                 |""".stripMargin)
+
+    spark.sql(s"INSERT INTO T VALUES (1, 'a')")
+
+    spark.sql("CALL paimon.sys.create_branch(table => 'test.T', branch => 
'test_branch')")
+
+    intercept[Exception] {
+      spark.sql("CALL paimon.sys.merge_branch(table => 'test.T', source_branch 
=> 'test_branch')")
+    }
+  }
+
+  test("Paimon procedure: merge branch with non-existent branch should fail") {
+    spark.sql(s"""
+                 |CREATE TABLE T (id INT, name STRING)
+                 |USING PAIMON
+                 |TBLPROPERTIES ('branch-merge.enabled' = 'true')
+                 |""".stripMargin)
+
+    spark.sql(s"INSERT INTO T VALUES (1, 'a')")
+
+    intercept[Exception] {
+      spark.sql("CALL paimon.sys.merge_branch(table => 'test.T', source_branch 
=> 'nonexistent')")
+    }
+  }
+
+  test("Paimon procedure: merge branch with explicit target branch") {
+    spark.sql(s"""
+                 |CREATE TABLE T (id INT, name STRING)
+                 |USING PAIMON
+                 |TBLPROPERTIES ('branch-merge.enabled' = 'true')
+                 |""".stripMargin)
+
+    spark.sql(s"INSERT INTO T VALUES (1, 'a')")
+
+    spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => 'tag1')")
+
+    spark.sql(
+      "CALL paimon.sys.create_branch(table => 'test.T', branch => 'branchA', 
tag => 'tag1')")
+    spark.sql(
+      "CALL paimon.sys.create_branch(table => 'test.T', branch => 'branchB', 
tag => 'tag1')")
+
+    spark.sql(s"INSERT INTO `T$$branch_branchA` VALUES (2, 'b')")
+
+    checkAnswer(
+      spark.sql(
+        "CALL paimon.sys.merge_branch(table => 'test.T', source_branch => 
'branchA', target_branch => 'branchB')"),
+      Row(true) :: Nil)
+
+    checkAnswer(
+      spark.sql(s"SELECT * FROM `T$$branch_branchB` ORDER BY id"),
+      Row(1, "a") :: Row(2, "b") :: Nil)
+  }
+
+}

Reply via email to