This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 80e022b282 [flink] Add Flink procedure and action for branch merge
(#7931)
80e022b282 is described below
commit 80e022b2825cd62a7e693a39b3547dcaaf1fcc9c
Author: Junrui Lee <[email protected]>
AuthorDate: Fri May 22 13:31:13 2026 +0800
[flink] Add Flink procedure and action for branch merge (#7931)
---
docs/docs/flink/procedures.md | 20 +++++++
.../paimon/flink/action/MergeBranchAction.java | 43 +++++++++++++
.../flink/action/MergeBranchActionFactory.java | 70 ++++++++++++++++++++++
.../flink/procedure/MergeBranchProcedure.java | 70 ++++++++++++++++++++++
.../services/org.apache.paimon.factories.Factory | 2 +
.../org/apache/paimon/flink/BranchSqlITCase.java | 24 ++++++++
.../paimon/flink/action/BranchActionITCase.java | 56 +++++++++++++++++
7 files changed, 285 insertions(+)
diff --git a/docs/docs/flink/procedures.md b/docs/docs/flink/procedures.md
index 1e34926ae3..dc4f1fedaf 100644
--- a/docs/docs/flink/procedures.md
+++ b/docs/docs/flink/procedures.md
@@ -826,6 +826,26 @@ All available procedures are listed below.
CALL sys.fast_forward(`table` => 'default.T', branch => 'branch1')
</td>
</tr>
+ <tr>
+ <td>merge_branch</td>
+ <td>
+ -- Use named argument<br/>
+ CALL [catalog.]sys.merge_branch(`table` => 'identifier',
source_branch => 'sourceBranchName')<br/><br/>
+ CALL [catalog.]sys.merge_branch(`table` => 'identifier',
source_branch => 'sourceBranchName', target_branch => 'targetBranchName')
+ </td>
+ <td>
+ Merge data files from source branch into target branch for
append-only tables.
+ The table must be created with <code>'branch-merge.enabled' =
'true'</code>. This option enforces a pure-append table history by rejecting
compaction and INSERT OVERWRITE, and it is incompatible with deletion vectors.
+ Requires compatible schema history and consistent row-tracking
settings between source and target. Arguments:
+ <li>table: the table identifier. Cannot be empty.</li>
+ <li>source_branch: name of the source branch to merge from. Cannot
be empty.</li>
+ <li>target_branch(optional): name of the target branch to merge
into. Default is 'main'.</li>
+ </td>
+ <td>
+ CALL sys.merge_branch(`table` => 'default.T', source_branch =>
'branch1')<br/><br/>
+ CALL sys.merge_branch(`table` => 'default.T', source_branch =>
'branch1', target_branch => 'branch2')
+ </td>
+ </tr>
<tr>
<td>compact_manifest</td>
<td>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeBranchAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeBranchAction.java
new file mode 100644
index 0000000000..b01ea095f9
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeBranchAction.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import java.util.Map;
+
+/** Merge branch action for Flink. */
+public class MergeBranchAction extends TableActionBase implements LocalAction {
+ private final String sourceBranch;
+ private final String targetBranch;
+
+ public MergeBranchAction(
+ String databaseName,
+ String tableName,
+ Map<String, String> catalogConfig,
+ String sourceBranch,
+ String targetBranch) {
+ super(databaseName, tableName, catalogConfig);
+ this.sourceBranch = sourceBranch;
+ this.targetBranch = targetBranch;
+ }
+
+ @Override
+ public void executeLocally() throws Exception {
+ table.mergeBranch(sourceBranch, targetBranch);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeBranchActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeBranchActionFactory.java
new file mode 100644
index 0000000000..06eb67a1a1
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeBranchActionFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.catalog.Identifier;
+
+import java.util.Optional;
+
+/** Factory to create {@link MergeBranchAction}. */
+public class MergeBranchActionFactory implements ActionFactory {
+
+ public static final String IDENTIFIER = "merge_branch";
+
+ private static final String SOURCE_BRANCH = "source_branch";
+ private static final String TARGET_BRANCH = "target_branch";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Optional<Action> create(MultipleParameterToolAdapter params) {
+ String targetBranch =
+ params.has(TARGET_BRANCH)
+ ? params.getRequired(TARGET_BRANCH)
+ : Identifier.DEFAULT_MAIN_BRANCH;
+ MergeBranchAction action =
+ new MergeBranchAction(
+ params.getRequired(DATABASE),
+ params.getRequired(TABLE),
+ catalogConfigMap(params),
+ params.getRequired(SOURCE_BRANCH),
+ targetBranch);
+ return Optional.of(action);
+ }
+
+ @Override
+ public void printHelp() {
+ System.out.println(
+ "Action \"merge_branch\" merges data files from a source
branch into a target branch.");
+ System.out.println();
+
+ System.out.println("Syntax:");
+ System.out.println(
+ " merge_branch \\\n"
+ + "--warehouse <warehouse_path> \\\n"
+ + "--database <database_name> \\\n"
+ + "--table <table_name> \\\n"
+ + "--source_branch <source_branch_name> \\\n"
+ + "[--target_branch <target_branch_name>]");
+ System.out.println();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeBranchProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeBranchProcedure.java
new file mode 100644
index 0000000000..ebe6adca49
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeBranchProcedure.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import javax.annotation.Nullable;
+
+/**
+ * Procedure for merging branches. Usage:
+ *
+ * <pre><code>
+ * CALL sys.merge_branch('tableId', 'sourceBranch', 'targetBranch')
+ * </code></pre>
+ */
+public class MergeBranchProcedure extends ProcedureBase {
+
+ public static final String IDENTIFIER = "merge_branch";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+ @ArgumentHint(name = "source_branch", type =
@DataTypeHint("STRING")),
+ @ArgumentHint(
+ name = "target_branch",
+ type = @DataTypeHint("STRING"),
+ isOptional = true)
+ })
+ public String[] call(
+ ProcedureContext procedureContext,
+ String tableId,
+ String sourceBranch,
+ @Nullable String targetBranch)
+ throws Catalog.TableNotExistException {
+ if (targetBranch == null) {
+ targetBranch = Identifier.DEFAULT_MAIN_BRANCH;
+ }
+ Table table = catalog.getTable(Identifier.fromString(tableId));
+ table.mergeBranch(sourceBranch, targetBranch);
+ return new String[] {"Success"};
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index c8f4640017..eb83dcdcb3 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -38,6 +38,7 @@ org.apache.paimon.flink.action.MarkPartitionDoneActionFactory
org.apache.paimon.flink.action.CreateBranchActionFactory
org.apache.paimon.flink.action.DeleteBranchActionFactory
org.apache.paimon.flink.action.FastForwardActionFactory
+org.apache.paimon.flink.action.MergeBranchActionFactory
org.apache.paimon.flink.action.RenameTagActionFactory
org.apache.paimon.flink.action.RepairActionFactory
org.apache.paimon.flink.action.RewriteFileIndexActionFactory
@@ -85,6 +86,7 @@
org.apache.paimon.flink.procedure.privilege.RevokePrivilegeFromUserProcedure
org.apache.paimon.flink.procedure.RepairProcedure
org.apache.paimon.flink.procedure.RenameTagProcedure
org.apache.paimon.flink.procedure.FastForwardProcedure
+org.apache.paimon.flink.procedure.MergeBranchProcedure
org.apache.paimon.flink.procedure.MarkPartitionDoneProcedure
org.apache.paimon.flink.procedure.CopyFilesProcedure
org.apache.paimon.flink.procedure.CloneProcedure
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index 2f164ca28f..e2e52ae3a3 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -418,6 +418,30 @@ public class BranchSqlITCase extends CatalogITCaseBase {
"Fast-forward from the current branch 'test' is not
allowed.");
}
+ @Test
+ public void testBranchMerge() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " pt INT"
+ + ", k INT"
+ + ", v STRING"
+ + " ) PARTITIONED BY (pt) WITH ("
+ + " 'bucket' = '-1',"
+ + " 'branch-merge.enabled' = 'true'"
+ + " )");
+
+ sql("INSERT INTO T VALUES (1, 10, 'a')");
+
+ sql("CALL sys.create_branch('default.T', 'test')");
+
+ sql("INSERT INTO `T$branch_test` VALUES (2, 20, 'b')");
+
+ sql("CALL sys.merge_branch(`table` => 'default.T', source_branch =>
'test')");
+
+ assertThat(collectResult("SELECT * FROM T"))
+ .containsExactlyInAnyOrder("+I[1, 10, a]", "+I[2, 20, b]");
+ }
+
@Test
public void testFallbackBranchBatchRead() throws Exception {
sql(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
index c0ece12168..55531e6c61 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
@@ -38,7 +38,9 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
import static org.assertj.core.api.Assertions.assertThat;
@@ -529,6 +531,60 @@ public class BranchActionITCase extends ActionITCaseBase {
assertEquals(expected, sortedActual);
}
+ @Test
+ void testMergeBranch() throws Exception {
+ init(warehouse);
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.BIGINT(),
DataTypes.STRING()},
+ new String[] {"k", "v"});
+ Map<String, String> options = new HashMap<>();
+ options.put("bucket", "-1");
+ options.put("branch-merge.enabled", "true");
+ FileStoreTable table =
+ createFileStoreTable(
+ rowType,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ options);
+
+ StreamWriteBuilder writeBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = writeBuilder.newWrite();
+ commit = writeBuilder.newCommit();
+
+ writeData(rowData(1L, BinaryString.fromString("Hi")));
+
+ executeSQL(
+ String.format(
+ "CALL sys.create_branch('%s.%s', 'merge_branch')",
database, tableName));
+
+ FileStoreTable branchTable = table.switchToBranch("merge_branch");
+ StreamWriteBuilder branchWriteBuilder =
+ branchTable.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = branchWriteBuilder.newWrite();
+ commit = branchWriteBuilder.newCommit();
+
+ writeData(rowData(2L, BinaryString.fromString("Hello")));
+
+ createAction(
+ MergeBranchAction.class,
+ "merge_branch",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ tableName,
+ "--source_branch",
+ "merge_branch")
+ .run();
+
+ table = getFileStoreTable(tableName);
+ List<String> result = readTableData(table);
+ assertThat(result).containsExactlyInAnyOrder("+I[1, Hi]", "+I[2,
Hello]");
+ }
+
protected List<String> readTableData(FileStoreTable table) throws
Exception {
RowType rowType =
RowType.of(