This is an automated email from the ASF dual-hosted git repository.

liguojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 705c9590f [Core]Support merge branch (#2862)
705c9590f is described below

commit 705c9590f576278d20cb00da62319ade70d3794d
Author: TaoZex <[email protected]>
AuthorDate: Thu Jun 13 22:54:16 2024 +0800

    [Core]Support merge branch (#2862)
    
    * [Core]Support merge branch
---
 .../src/main/java/org/apache/paimon/fs/FileIO.java |  20 ++++
 .../paimon/privilege/PrivilegedFileStoreTable.java |   6 +
 .../java/org/apache/paimon/schema/TableSchema.java |  12 ++
 .../paimon/table/AbstractFileStoreTable.java       |   5 +
 .../org/apache/paimon/table/ReadonlyTable.java     |   8 ++
 .../main/java/org/apache/paimon/table/Table.java   |   4 +
 .../org/apache/paimon/utils/BranchManager.java     |  75 ++++++++++++
 .../org/apache/paimon/utils/SnapshotManager.java   |   6 +
 .../paimon/table/FileStoreTableTestBase.java       | 131 ++++++++++++++++++++-
 .../paimon/table/PrimaryKeyFileStoreTableTest.java |  18 +--
 .../flink/procedure/MergeBranchProcedure.java      |  54 +++++++++
 .../services/org.apache.paimon.factories.Factory   |   1 +
 .../paimon/flink/action/BranchActionITCase.java    | 113 ++++++++++++++++--
 13 files changed, 437 insertions(+), 16 deletions(-)

diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
index 55aa9b825..f31b00af1 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
@@ -175,6 +175,12 @@ public interface FileIO extends Serializable {
         }
     }
 
+    default void deleteFilesQuietly(List<Path> files) {
+        for (Path file : files) {
+            deleteQuietly(file);
+        }
+    }
+
     default void deleteDirectoryQuietly(Path directory) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Ready to delete " + directory.toString());
@@ -272,6 +278,20 @@ public interface FileIO extends Serializable {
         return writeFileUtf8(targetPath, content);
     }
 
+    /** Copy all files in sourceDirectory to directory targetDirectory. */
+    default void copyFilesUtf8(Path sourceDirectory, Path targetDirectory) 
throws IOException {
+        FileStatus[] fileStatuses = listStatus(sourceDirectory);
+        List<Path> copyFiles =
+                Arrays.stream(fileStatuses)
+                        .map(fileStatus -> fileStatus.getPath())
+                        .collect(Collectors.toList());
+        for (Path file : copyFiles) {
+            String fileName = file.getName();
+            Path targetPath = new Path(targetDirectory.toString() + "/" + 
fileName);
+            copyFileUtf8(file, targetPath);
+        }
+    }
+
     /** Read file from {@link #overwriteFileUtf8} file. */
     default Optional<String> readOverwrittenFileUtf8(Path path) throws 
IOException {
         int retryNumber = 0;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
index fe0c65728..548ae69ee 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
@@ -204,6 +204,12 @@ public class PrivilegedFileStoreTable implements 
FileStoreTable {
         wrapped.deleteBranch(branchName);
     }
 
+    @Override
+    public void mergeBranch(String branchName) {
+        privilegeChecker.assertCanInsert(identifier);
+        wrapped.mergeBranch(branchName);
+    }
+
     @Override
     public void replaceBranch(String fromBranch) {
         privilegeChecker.assertCanInsert(identifier);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
index 2d86c96c1..4f94c6470 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.schema;
 
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.JsonSerdeUtil;
@@ -26,6 +28,7 @@ import org.apache.paimon.utils.StringUtils;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
@@ -286,6 +289,15 @@ public class TableSchema implements Serializable {
         return JsonSerdeUtil.fromJson(json, TableSchema.class);
     }
 
+    public static TableSchema fromPath(FileIO fileIO, Path path) {
+        try {
+            String json = fileIO.readFileUtf8(path);
+            return TableSchema.fromJson(json);
+        } catch (IOException e) {
+            throw new RuntimeException("Fails to read schema from path " + 
path, e);
+        }
+    }
+
     @Override
     public String toString() {
         return JsonSerdeUtil.toJson(this);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index bf8857e72..cddc1d7bf 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -524,6 +524,11 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
         branchManager().deleteBranch(branchName);
     }
 
+    @Override
+    public void mergeBranch(String branchName) {
+        branchManager().mergeBranch(branchName);
+    }
+
     @Override
     public void replaceBranch(String fromBranch) {
         branchManager().replaceBranch(fromBranch);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
index dc50a83d9..70023d555 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
@@ -182,6 +182,14 @@ public interface ReadonlyTable extends InnerTable {
                         this.getClass().getSimpleName()));
     }
 
+    @Override
+    default void mergeBranch(String branchName) {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "Readonly Table %s does not support mergeBranch.",
+                        this.getClass().getSimpleName()));
+    }
+
     @Override
     default void replaceBranch(String fromBranch) {
         throw new UnsupportedOperationException(
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java 
b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
index d01ecc95c..227f2836d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
@@ -111,6 +111,10 @@ public interface Table extends Serializable {
     @Experimental
     void deleteBranch(String branchName);
 
+    /** Merge a branch to main branch. */
+    @Experimental
+    void mergeBranch(String branchName);
+
     @Experimental
     void replaceBranch(String fromBranch);
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
index 9742d63ac..63c33e593 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
@@ -21,6 +21,7 @@ package org.apache.paimon.utils;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.branch.TableBranch;
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
@@ -41,8 +42,10 @@ import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.paimon.utils.FileUtils.listVersionedDirectories;
+import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Manager for {@code Branch}. */
@@ -353,6 +356,78 @@ public class BranchManager {
         }
     }
 
+    public void mergeBranch(String branchName) {
+        checkArgument(
+                !branchName.equals(DEFAULT_MAIN_BRANCH),
+                "Branch name '%s' do not use in merge branch.",
+                branchName);
+        checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is 
blank.", branchName);
+        checkArgument(branchExists(branchName), "Branch name '%s' doesn't 
exist.", branchName);
+
+        Long earliestSnapshotId = 
snapshotManager.copyWithBranch(branchName).earliestSnapshotId();
+        Snapshot earliestSnapshot =
+                
snapshotManager.copyWithBranch(branchName).snapshot(earliestSnapshotId);
+        long earliestSchemaId = earliestSnapshot.schemaId();
+
+        try {
+            // Delete snapshot, schema, and tag from the main branch which 
occurs after
+            // earliestSnapshotId
+            List<Path> deleteSnapshotPaths =
+                    listVersionedFileStatus(
+                                    fileIO, 
snapshotManager.snapshotDirectory(), "snapshot-")
+                            .map(FileStatus::getPath)
+                            .filter(
+                                    path ->
+                                            Snapshot.fromPath(fileIO, 
path).id()
+                                                    >= earliestSnapshotId)
+                            .collect(Collectors.toList());
+            List<Path> deleteSchemaPaths =
+                    listVersionedFileStatus(fileIO, 
schemaManager.schemaDirectory(), "schema-")
+                            .map(FileStatus::getPath)
+                            .filter(
+                                    path ->
+                                            TableSchema.fromPath(fileIO, 
path).id()
+                                                    >= earliestSchemaId)
+                            .collect(Collectors.toList());
+            List<Path> deleteTagPaths =
+                    listVersionedFileStatus(fileIO, tagManager.tagDirectory(), 
"tag-")
+                            .map(FileStatus::getPath)
+                            .filter(
+                                    path ->
+                                            Snapshot.fromPath(fileIO, 
path).id()
+                                                    >= earliestSnapshotId)
+                            .collect(Collectors.toList());
+
+            List<Path> deletePaths =
+                    Stream.concat(
+                                    Stream.concat(
+                                            deleteSnapshotPaths.stream(),
+                                            deleteSchemaPaths.stream()),
+                                    deleteTagPaths.stream())
+                            .collect(Collectors.toList());
+
+            // Delete latest snapshot hint
+            snapshotManager.deleteLatestHint();
+
+            fileIO.deleteFilesQuietly(deletePaths);
+            fileIO.copyFilesUtf8(
+                    
snapshotManager.copyWithBranch(branchName).snapshotDirectory(),
+                    snapshotManager.snapshotDirectory());
+            fileIO.copyFilesUtf8(
+                    schemaManager.copyWithBranch(branchName).schemaDirectory(),
+                    schemaManager.schemaDirectory());
+            fileIO.copyFilesUtf8(
+                    tagManager.copyWithBranch(branchName).tagDirectory(),
+                    tagManager.tagDirectory());
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    String.format(
+                            "Exception occurs when merge branch '%s' 
(directory in %s).",
+                            branchName, getBranchPath(fileIO, tablePath, 
branchName)),
+                    e);
+        }
+    }
+
     /** Check if a branch exists. */
     public boolean branchExists(String branchName) {
         Path branchPath = branchPath(branchName);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 93442fd7c..af1d450db 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -634,6 +634,12 @@ public class SnapshotManager implements Serializable {
         return listVersionedFiles(fileIO, dir, 
prefix).reduce(reducer).orElse(null);
     }
 
+    public void deleteLatestHint() throws IOException {
+        Path snapshotDir = snapshotDirectory();
+        Path hintFile = new Path(snapshotDir, LATEST);
+        fileIO.delete(hintFile, false);
+    }
+
     public void commitLatestHint(long snapshotId) throws IOException {
         commitHint(snapshotId, LATEST, snapshotDirectory());
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index 35b7666c9..6ff0bd441 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -1165,6 +1165,136 @@ public abstract class FileStoreTableTestBase {
                                 "Branch name 'branch1' doesn't exist."));
     }
 
+    @Test
+    public void testMergeBranch() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        generateBranch(table);
+        FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME);
+
+        // Verify branch1 and the main branch have the same data
+        assertThat(
+                        getResult(
+                                tableBranch.newRead(),
+                                
toSplits(tableBranch.newSnapshotReader().read().dataSplits()),
+                                BATCH_ROW_TO_STRING))
+                
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
+
+        // Test for unsupported branch name
+        assertThatThrownBy(() -> table.mergeBranch("test-branch"))
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "Branch name 'test-branch' doesn't exist."));
+
+        assertThatThrownBy(() -> table.mergeBranch("main"))
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "Branch name 'main' do not use in merge 
branch."));
+
+        // Write data to branch1
+        try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+                StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+            write.write(rowData(2, 20, 200L));
+            commit.commit(1, write.prepareCommit(false, 2));
+        }
+
+        // Validate data in branch1
+        assertThat(
+                        getResult(
+                                tableBranch.newRead(),
+                                
toSplits(tableBranch.newSnapshotReader().read().dataSplits()),
+                                BATCH_ROW_TO_STRING))
+                .containsExactlyInAnyOrder(
+                        "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+                        "2|20|200|binary|varbinary|mapKey:mapVal|multiset");
+
+        // Validate data in main branch not changed
+        assertThat(
+                        getResult(
+                                table.newRead(),
+                                
toSplits(table.newSnapshotReader().read().dataSplits()),
+                                BATCH_ROW_TO_STRING))
+                
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
+
+        // Merge branch1 to main branch
+        table.mergeBranch(BRANCH_NAME);
+
+        // After merge branch1, verify branch1 and the main branch have the 
same data
+        assertThat(
+                        getResult(
+                                table.newRead(),
+                                
toSplits(table.newSnapshotReader().read().dataSplits()),
+                                BATCH_ROW_TO_STRING))
+                .containsExactlyInAnyOrder(
+                        "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+                        "2|20|200|binary|varbinary|mapKey:mapVal|multiset");
+
+        // verify snapshot in branch1 and main branch is same
+        SnapshotManager snapshotManager = new SnapshotManager(new 
TraceableFileIO(), tablePath);
+        Snapshot branchSnapshot =
+                Snapshot.fromPath(
+                        new TraceableFileIO(),
+                        
snapshotManager.copyWithBranch(BRANCH_NAME).snapshotPath(2));
+        Snapshot snapshot =
+                Snapshot.fromPath(new TraceableFileIO(), 
snapshotManager.snapshotPath(2));
+        assertThat(branchSnapshot.equals(snapshot)).isTrue();
+
+        // verify schema in branch1 and main branch is same
+        SchemaManager schemaManager = new SchemaManager(new TraceableFileIO(), 
tablePath);
+        TableSchema branchSchema =
+                SchemaManager.fromPath(
+                        new TraceableFileIO(),
+                        
schemaManager.copyWithBranch(BRANCH_NAME).toSchemaPath(0));
+        TableSchema schema0 = schemaManager.schema(0);
+        assertThat(branchSchema.equals(schema0)).isTrue();
+
+        // Write two rows data to branch1 again
+        try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+                StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+            write.write(rowData(3, 30, 300L));
+            write.write(rowData(4, 40, 400L));
+            commit.commit(2, write.prepareCommit(false, 3));
+        }
+
+        // Verify data in branch1
+        assertThat(
+                        getResult(
+                                tableBranch.newRead(),
+                                
toSplits(tableBranch.newSnapshotReader().read().dataSplits()),
+                                BATCH_ROW_TO_STRING))
+                .containsExactlyInAnyOrder(
+                        "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+                        "2|20|200|binary|varbinary|mapKey:mapVal|multiset",
+                        "3|30|300|binary|varbinary|mapKey:mapVal|multiset",
+                        "4|40|400|binary|varbinary|mapKey:mapVal|multiset");
+
+        // Verify data in main branch not changed
+        assertThat(
+                        getResult(
+                                table.newRead(),
+                                
toSplits(table.newSnapshotReader().read().dataSplits()),
+                                BATCH_ROW_TO_STRING))
+                .containsExactlyInAnyOrder(
+                        "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+                        "2|20|200|binary|varbinary|mapKey:mapVal|multiset");
+
+        // Merge branch1 to main branch again
+        table.mergeBranch("branch1");
+
+        // Verify data in main branch is same to branch1
+        assertThat(
+                        getResult(
+                                table.newRead(),
+                                
toSplits(table.newSnapshotReader().read().dataSplits()),
+                                BATCH_ROW_TO_STRING))
+                .containsExactlyInAnyOrder(
+                        "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+                        "2|20|200|binary|varbinary|mapKey:mapVal|multiset",
+                        "3|30|300|binary|varbinary|mapKey:mapVal|multiset",
+                        "4|40|400|binary|varbinary|mapKey:mapVal|multiset");
+    }
+
     @Test
     public void testUnsupportedTagName() throws Exception {
         FileStoreTable table = createFileStoreTable();
@@ -1636,7 +1766,6 @@ public abstract class FileStoreTableTestBase {
         table.createBranch(BRANCH_NAME, "tag1");
 
         // verify that branch1 file exist
-        TraceableFileIO fileIO = new TraceableFileIO();
         BranchManager branchManager = table.branchManager();
         assertThat(branchManager.branchExists(BRANCH_NAME)).isTrue();
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index 095511ae1..eccb68221 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -1696,15 +1696,17 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         options.set(BUCKET, 1);
         options.set(BRANCH, branch);
         configure.accept(options);
+        TableSchema latestSchema =
+                new SchemaManager(LocalFileIO.create(), 
tablePath).latest().get();
         TableSchema tableSchema =
-                SchemaUtils.forceCommit(
-                        new SchemaManager(LocalFileIO.create(), tablePath),
-                        new Schema(
-                                rowType.getFields(),
-                                Collections.singletonList("pt"),
-                                Arrays.asList("pt", "a"),
-                                options.toMap(),
-                                ""));
+                new TableSchema(
+                        latestSchema.id(),
+                        latestSchema.fields(),
+                        latestSchema.highestFieldId(),
+                        latestSchema.partitionKeys(),
+                        latestSchema.primaryKeys(),
+                        options.toMap(),
+                        latestSchema.comment());
         return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath), 
tablePath, tableSchema);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeBranchProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeBranchProcedure.java
new file mode 100644
index 000000000..e7eb3eb33
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeBranchProcedure.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Merge branch procedure for given branch. Usage:
+ *
+ * <pre><code>
+ *  CALL sys.merge_branch('tableId', 'branchName')
+ * </code></pre>
+ */
+public class MergeBranchProcedure extends ProcedureBase {
+
+    public static final String IDENTIFIER = "merge_branch";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    public String[] call(ProcedureContext procedureContext, String tableId, 
String branchName)
+            throws Catalog.TableNotExistException {
+        return innerCall(tableId, branchName);
+    }
+
+    private String[] innerCall(String tableId, String branchName)
+            throws Catalog.TableNotExistException {
+        Table table = catalog.getTable(Identifier.fromString(tableId));
+        table.mergeBranch(branchName);
+        return new String[] {"Success"};
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 38cb36713..192be6c14 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -53,3 +53,4 @@ 
org.apache.paimon.flink.procedure.privilege.GrantPrivilegeToUserProcedure
 org.apache.paimon.flink.procedure.privilege.RevokePrivilegeFromUserProcedure
 org.apache.paimon.flink.procedure.RepairProcedure
 org.apache.paimon.flink.procedure.ReplaceBranchProcedure
+org.apache.paimon.flink.procedure.MergeBranchProcedure
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
index 209b0d2e7..a41d46e38 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
@@ -227,13 +227,7 @@ class BranchActionITCase extends ActionITCaseBase {
             writeData(rowData(i, 
BinaryString.fromString(String.format("new.data_%s", i))));
         }
 
-        ReadBuilder readBuilder = table.newReadBuilder();
-        TableScan.Plan plan = readBuilder.newScan().plan();
-        List<String> result =
-                getResult(
-                        readBuilder.newRead(),
-                        plan == null ? Collections.emptyList() : plan.splits(),
-                        rowType);
+        List<String> result = readTableData(table);
         List<String> sortedActual = new ArrayList<>(result);
         List<String> expected =
                 Arrays.asList(
@@ -255,4 +249,109 @@ class BranchActionITCase extends ActionITCaseBase {
                 String.format("CALL sys.create_tag('%s.%s', 'tag3', 3)", 
database, tableName));
         assertThat(tagManager.tagExists("tag3")).isTrue();
     }
+
+    @Test
+    void testMergeBranch() throws Exception {
+        init(warehouse);
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.BIGINT(), 
DataTypes.STRING()},
+                        new String[] {"k", "v"});
+        FileStoreTable table =
+                createFileStoreTable(
+                        rowType,
+                        Collections.emptyList(),
+                        Collections.singletonList("k"),
+                        Collections.emptyList(),
+                        Collections.emptyMap());
+
+        StreamWriteBuilder writeBuilder = 
table.newStreamWriteBuilder().withCommitUser(commitUser);
+        write = writeBuilder.newWrite();
+        commit = writeBuilder.newCommit();
+
+        // 3 snapshots
+        writeData(rowData(1L, BinaryString.fromString("Hi")));
+        writeData(rowData(2L, BinaryString.fromString("Hello")));
+        writeData(rowData(3L, BinaryString.fromString("Paimon")));
+
+        // Create tag2
+        TagManager tagManager = new TagManager(table.fileIO(), 
table.location());
+        callProcedure(
+                String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", 
database, tableName));
+        assertThat(tagManager.tagExists("tag2")).isTrue();
+
+        // Create merge_branch_name branch
+        BranchManager branchManager = table.branchManager();
+        callProcedure(
+                String.format(
+                        "CALL sys.create_branch('%s.%s', 'merge_branch_name', 
'tag2')",
+                        database, tableName));
+        assertThat(branchManager.branchExists("merge_branch_name")).isTrue();
+
+        // Merge branch
+        callProcedure(
+                String.format(
+                        "CALL sys.merge_branch('%s.%s', 'merge_branch_name')",
+                        database, tableName));
+
+        // Check snapshot
+        SnapshotManager snapshotManager = table.snapshotManager();
+        assertThat(snapshotManager.snapshotExists(3)).isFalse();
+
+        // Renew write
+        write = writeBuilder.newWrite();
+        commit = writeBuilder.newCommit();
+
+        // Add data, forward to merge branch
+        for (long i = 4; i < 14; i++) {
+            writeData(rowData(i, 
BinaryString.fromString(String.format("new.data_%s", i))));
+        }
+
+        // Check main branch data
+        List<String> result = readTableData(table);
+        List<String> sortedActual = new ArrayList<>(result);
+        List<String> expected =
+                Arrays.asList(
+                        "+I[1, Hi]",
+                        "+I[2, Hello]",
+                        "+I[4, new.data_4]",
+                        "+I[5, new.data_5]",
+                        "+I[6, new.data_6]",
+                        "+I[7, new.data_7]",
+                        "+I[8, new.data_8]",
+                        "+I[9, new.data_9]",
+                        "+I[10, new.data_10]",
+                        "+I[11, new.data_11]",
+                        "+I[12, new.data_12]",
+                        "+I[13, new.data_13]");
+        Assert.assertEquals(expected, sortedActual);
+
+        // Merge branch again
+        callProcedure(
+                String.format(
+                        "CALL sys.merge_branch('%s.%s', 'merge_branch_name')",
+                        database, tableName));
+
+        // Check main branch data
+        result = readTableData(table);
+        sortedActual = new ArrayList<>(result);
+        expected = Arrays.asList("+I[1, Hi]", "+I[2, Hello]");
+        Assert.assertEquals(expected, sortedActual);
+    }
+
+    List<String> readTableData(FileStoreTable table) throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.BIGINT(), 
DataTypes.STRING()},
+                        new String[] {"k", "v"});
+
+        ReadBuilder readBuilder = table.newReadBuilder();
+        TableScan.Plan plan = readBuilder.newScan().plan();
+        List<String> result =
+                getResult(
+                        readBuilder.newRead(),
+                        plan == null ? Collections.emptyList() : plan.splits(),
+                        rowType);
+        return result;
+    }
 }

Reply via email to