This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 80e022b282 [flink] Add Flink procedure and action for branch merge 
(#7931)
80e022b282 is described below

commit 80e022b2825cd62a7e693a39b3547dcaaf1fcc9c
Author: Junrui Lee <[email protected]>
AuthorDate: Fri May 22 13:31:13 2026 +0800

    [flink] Add Flink procedure and action for branch merge (#7931)
---
 docs/docs/flink/procedures.md                      | 20 +++++++
 .../paimon/flink/action/MergeBranchAction.java     | 43 +++++++++++++
 .../flink/action/MergeBranchActionFactory.java     | 70 ++++++++++++++++++++++
 .../flink/procedure/MergeBranchProcedure.java      | 70 ++++++++++++++++++++++
 .../services/org.apache.paimon.factories.Factory   |  2 +
 .../org/apache/paimon/flink/BranchSqlITCase.java   | 24 ++++++++
 .../paimon/flink/action/BranchActionITCase.java    | 56 +++++++++++++++++
 7 files changed, 285 insertions(+)

diff --git a/docs/docs/flink/procedures.md b/docs/docs/flink/procedures.md
index 1e34926ae3..dc4f1fedaf 100644
--- a/docs/docs/flink/procedures.md
+++ b/docs/docs/flink/procedures.md
@@ -826,6 +826,26 @@ All available procedures are listed below.
          CALL sys.fast_forward(`table` => 'default.T', branch => 'branch1')
       </td>
    </tr>
+   <tr>
+      <td>merge_branch</td>
+      <td>
+         -- Use named argument<br/>
+         CALL [catalog.]sys.merge_branch(`table` => 'identifier', 
source_branch => 'sourceBranchName')<br/><br/>
+         CALL [catalog.]sys.merge_branch(`table` => 'identifier', 
source_branch => 'sourceBranchName', target_branch => 'targetBranchName')
+      </td>
+      <td>
+         Merge data files from source branch into target branch for 
append-only tables.
+         The table must be created with <code>'branch-merge.enabled' = 
'true'</code>. This option enforces a pure-append table history by rejecting 
compaction and INSERT OVERWRITE, and it is incompatible with deletion vectors.
+         Requires compatible schema history and consistent row-tracking 
settings between source and target. Arguments:
+            <li>table: the table identifier. Cannot be empty.</li>
+            <li>source_branch: name of the source branch to merge from. Cannot 
be empty.</li>
+            <li>target_branch(optional): name of the target branch to merge 
into. Default is 'main'.</li>
+      </td>
+      <td>
+         CALL sys.merge_branch(`table` => 'default.T', source_branch => 
'branch1')<br/><br/>
+         CALL sys.merge_branch(`table` => 'default.T', source_branch => 
'branch1', target_branch => 'branch2')
+      </td>
+   </tr>
    <tr>
       <td>compact_manifest</td>
       <td>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeBranchAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeBranchAction.java
new file mode 100644
index 0000000000..b01ea095f9
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeBranchAction.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import java.util.Map;
+
+/** Merge branch action for Flink. */
+public class MergeBranchAction extends TableActionBase implements LocalAction {
+    private final String sourceBranch;
+    private final String targetBranch;
+
+    public MergeBranchAction(
+            String databaseName,
+            String tableName,
+            Map<String, String> catalogConfig,
+            String sourceBranch,
+            String targetBranch) {
+        super(databaseName, tableName, catalogConfig);
+        this.sourceBranch = sourceBranch;
+        this.targetBranch = targetBranch;
+    }
+
+    @Override
+    public void executeLocally() throws Exception {
+        table.mergeBranch(sourceBranch, targetBranch);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeBranchActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeBranchActionFactory.java
new file mode 100644
index 0000000000..06eb67a1a1
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeBranchActionFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.catalog.Identifier;
+
+import java.util.Optional;
+
+/** Factory to create {@link MergeBranchAction}. */
+public class MergeBranchActionFactory implements ActionFactory {
+
+    public static final String IDENTIFIER = "merge_branch";
+
+    private static final String SOURCE_BRANCH = "source_branch";
+    private static final String TARGET_BRANCH = "target_branch";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Optional<Action> create(MultipleParameterToolAdapter params) {
+        String targetBranch =
+                params.has(TARGET_BRANCH)
+                        ? params.getRequired(TARGET_BRANCH)
+                        : Identifier.DEFAULT_MAIN_BRANCH;
+        MergeBranchAction action =
+                new MergeBranchAction(
+                        params.getRequired(DATABASE),
+                        params.getRequired(TABLE),
+                        catalogConfigMap(params),
+                        params.getRequired(SOURCE_BRANCH),
+                        targetBranch);
+        return Optional.of(action);
+    }
+
+    @Override
+    public void printHelp() {
+        System.out.println(
+                "Action \"merge_branch\" merges data files from a source 
branch into a target branch.");
+        System.out.println();
+
+        System.out.println("Syntax:");
+        System.out.println(
+                "  merge_branch \\\n"
+                        + "--warehouse <warehouse_path> \\\n"
+                        + "--database <database_name> \\\n"
+                        + "--table <table_name> \\\n"
+                        + "--source_branch <source_branch_name> \\\n"
+                        + "[--target_branch <target_branch_name>]");
+        System.out.println();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeBranchProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeBranchProcedure.java
new file mode 100644
index 0000000000..ebe6adca49
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeBranchProcedure.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import javax.annotation.Nullable;
+
+/**
+ * Procedure for merging branches. Usage:
+ *
+ * <pre><code>
+ *  CALL sys.merge_branch('tableId', 'sourceBranch', 'targetBranch')
+ * </code></pre>
+ */
+public class MergeBranchProcedure extends ProcedureBase {
+
+    public static final String IDENTIFIER = "merge_branch";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @ProcedureHint(
+            argument = {
+                @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+                @ArgumentHint(name = "source_branch", type = 
@DataTypeHint("STRING")),
+                @ArgumentHint(
+                        name = "target_branch",
+                        type = @DataTypeHint("STRING"),
+                        isOptional = true)
+            })
+    public String[] call(
+            ProcedureContext procedureContext,
+            String tableId,
+            String sourceBranch,
+            @Nullable String targetBranch)
+            throws Catalog.TableNotExistException {
+        if (targetBranch == null) {
+            targetBranch = Identifier.DEFAULT_MAIN_BRANCH;
+        }
+        Table table = catalog.getTable(Identifier.fromString(tableId));
+        table.mergeBranch(sourceBranch, targetBranch);
+        return new String[] {"Success"};
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index c8f4640017..eb83dcdcb3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -38,6 +38,7 @@ org.apache.paimon.flink.action.MarkPartitionDoneActionFactory
 org.apache.paimon.flink.action.CreateBranchActionFactory
 org.apache.paimon.flink.action.DeleteBranchActionFactory
 org.apache.paimon.flink.action.FastForwardActionFactory
+org.apache.paimon.flink.action.MergeBranchActionFactory
 org.apache.paimon.flink.action.RenameTagActionFactory
 org.apache.paimon.flink.action.RepairActionFactory
 org.apache.paimon.flink.action.RewriteFileIndexActionFactory
@@ -85,6 +86,7 @@ 
org.apache.paimon.flink.procedure.privilege.RevokePrivilegeFromUserProcedure
 org.apache.paimon.flink.procedure.RepairProcedure
 org.apache.paimon.flink.procedure.RenameTagProcedure
 org.apache.paimon.flink.procedure.FastForwardProcedure
+org.apache.paimon.flink.procedure.MergeBranchProcedure
 org.apache.paimon.flink.procedure.MarkPartitionDoneProcedure
 org.apache.paimon.flink.procedure.CopyFilesProcedure
 org.apache.paimon.flink.procedure.CloneProcedure
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index 2f164ca28f..e2e52ae3a3 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -418,6 +418,30 @@ public class BranchSqlITCase extends CatalogITCaseBase {
                         "Fast-forward from the current branch 'test' is not 
allowed.");
     }
 
+    @Test
+    public void testBranchMerge() throws Exception {
+        sql(
+                "CREATE TABLE T ("
+                        + " pt INT"
+                        + ", k INT"
+                        + ", v STRING"
+                        + " ) PARTITIONED BY (pt) WITH ("
+                        + " 'bucket' = '-1',"
+                        + " 'branch-merge.enabled' = 'true'"
+                        + " )");
+
+        sql("INSERT INTO T VALUES (1, 10, 'a')");
+
+        sql("CALL sys.create_branch('default.T', 'test')");
+
+        sql("INSERT INTO `T$branch_test` VALUES (2, 20, 'b')");
+
+        sql("CALL sys.merge_branch(`table` => 'default.T', source_branch => 
'test')");
+
+        assertThat(collectResult("SELECT * FROM T"))
+                .containsExactlyInAnyOrder("+I[1, 10, a]", "+I[2, 20, b]");
+    }
+
     @Test
     public void testFallbackBranchBatchRead() throws Exception {
         sql(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
index c0ece12168..55531e6c61 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
@@ -38,7 +38,9 @@ import org.junit.jupiter.params.provider.ValueSource;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -529,6 +531,60 @@ public class BranchActionITCase extends ActionITCaseBase {
         assertEquals(expected, sortedActual);
     }
 
+    @Test
+    void testMergeBranch() throws Exception {
+        init(warehouse);
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.BIGINT(), 
DataTypes.STRING()},
+                        new String[] {"k", "v"});
+        Map<String, String> options = new HashMap<>();
+        options.put("bucket", "-1");
+        options.put("branch-merge.enabled", "true");
+        FileStoreTable table =
+                createFileStoreTable(
+                        rowType,
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        options);
+
+        StreamWriteBuilder writeBuilder = 
table.newStreamWriteBuilder().withCommitUser(commitUser);
+        write = writeBuilder.newWrite();
+        commit = writeBuilder.newCommit();
+
+        writeData(rowData(1L, BinaryString.fromString("Hi")));
+
+        executeSQL(
+                String.format(
+                        "CALL sys.create_branch('%s.%s', 'merge_branch')", 
database, tableName));
+
+        FileStoreTable branchTable = table.switchToBranch("merge_branch");
+        StreamWriteBuilder branchWriteBuilder =
+                branchTable.newStreamWriteBuilder().withCommitUser(commitUser);
+        write = branchWriteBuilder.newWrite();
+        commit = branchWriteBuilder.newCommit();
+
+        writeData(rowData(2L, BinaryString.fromString("Hello")));
+
+        createAction(
+                        MergeBranchAction.class,
+                        "merge_branch",
+                        "--warehouse",
+                        warehouse,
+                        "--database",
+                        database,
+                        "--table",
+                        tableName,
+                        "--source_branch",
+                        "merge_branch")
+                .run();
+
+        table = getFileStoreTable(tableName);
+        List<String> result = readTableData(table);
+        assertThat(result).containsExactlyInAnyOrder("+I[1, Hi]", "+I[2, 
Hello]");
+    }
+
     protected List<String> readTableData(FileStoreTable table) throws 
Exception {
         RowType rowType =
                 RowType.of(

Reply via email to