This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f336ba868a [core][rest] Add branch merge support for append-only 
tables (#7882)
f336ba868a is described below

commit f336ba868a2170f6dc05385ada2a815cf1f0764b
Author: Junrui Lee <[email protected]>
AuthorDate: Thu May 21 21:37:03 2026 +0800

    [core][rest] Add branch merge support for append-only tables (#7882)
    
    This PR is the first part of #7863, focusing on the core APIs,
    implementation, and REST support for branch merge.
    
    It adds `mergeBranch`, which incrementally adds data files that exist
    only in the source branch to the target branch, without replacing either
    branch. Correctness is guarded by the immutable table option
    `'branch-merge.enabled' = 'true'`, which enforces the pure-append
    invariant required by file-level branch merge: compaction and `INSERT
    OVERWRITE` are rejected, and deletion vectors are not supported.
---
 docs/static/rest-catalog-open-api.yaml             |    2 +-
 .../java/org/apache/paimon/io/DataFileMeta.java    |    2 +
 .../org/apache/paimon/io/PojoDataFileMeta.java     |   25 +
 .../paimon/privilege/PrivilegedFileStoreTable.java |    6 +
 .../paimon/table/AbstractFileStoreTable.java       |   13 +-
 .../paimon/table/DelegatedFileStoreTable.java      |    5 +
 .../java/org/apache/paimon/table/FormatTable.java  |    5 +
 .../org/apache/paimon/table/ReadonlyTable.java     |    8 +
 .../main/java/org/apache/paimon/table/Table.java   |    4 +
 .../org/apache/paimon/utils/BranchManager.java     |   17 +
 .../apache/paimon/utils/BranchMergeHandler.java    |  144 +++
 .../apache/paimon/utils/CatalogBranchManager.java  |    5 +
 .../paimon/utils/FileSystemBranchManager.java      |  203 +++-
 .../paimon/table/AppendOnlySimpleTableTest.java    | 1104 ++++++++++++++++++++
 .../paimon/table/PrimaryKeySimpleTableTest.java    |   18 +
 .../paimon/utils/FileSystemBranchManagerTest.java  |    2 +-
 16 files changed, 1559 insertions(+), 4 deletions(-)

diff --git a/docs/static/rest-catalog-open-api.yaml 
b/docs/static/rest-catalog-open-api.yaml
index ba4c8bed4b..e310f398f8 100644
--- a/docs/static/rest-catalog-open-api.yaml
+++ b/docs/static/rest-catalog-open-api.yaml
@@ -3615,4 +3615,4 @@ components:
   securitySchemes:
     BearerAuth:
       type: http
-      scheme: bearer
\ No newline at end of file
+      scheme: bearer
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index a7a4de0b5c..23b34947bc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -329,6 +329,8 @@ public interface DataFileMeta {
 
     DataFileMeta assignFirstRowId(long firstRowId);
 
+    DataFileMeta newFirstRowId(@Nullable Long newFirstRowId);
+
     default List<Path> collectFiles(DataFilePathFactory pathFactory) {
         List<Path> paths = new ArrayList<>();
         paths.add(pathFactory.toPath(this));
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
index 5003974e8e..9e845b26fe 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
@@ -379,6 +379,31 @@ public class PojoDataFileMeta implements DataFileMeta {
                 writeCols);
     }
 
+    @Override
+    public PojoDataFileMeta newFirstRowId(@Nullable Long newFirstRowId) {
+        return new PojoDataFileMeta(
+                fileName,
+                fileSize,
+                rowCount,
+                minKey,
+                maxKey,
+                keyStats,
+                valueStats,
+                minSequenceNumber,
+                maxSequenceNumber,
+                schemaId,
+                level,
+                extraFiles,
+                creationTime,
+                deleteRowCount,
+                embeddedIndex,
+                fileSource,
+                valueStatsCols,
+                externalPath,
+                newFirstRowId,
+                writeCols);
+    }
+
     @Override
     public PojoDataFileMeta copy(List<String> newExtraFiles) {
         return new PojoDataFileMeta(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
index a2c3e62c36..35dfd308df 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
@@ -204,6 +204,12 @@ public class PrivilegedFileStoreTable extends 
DelegatedFileStoreTable {
         wrapped.fastForward(branchName);
     }
 
+    @Override
+    public void mergeBranch(String sourceBranch, String targetBranch) {
+        privilegeChecker.assertCanInsert(identifier);
+        wrapped.mergeBranch(sourceBranch, targetBranch);
+    }
+
     @Override
     public ExpireSnapshots newExpireSnapshots() {
         privilegeChecker.assertCanInsert(identifier);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 522d25c7d3..df42249159 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -55,6 +55,7 @@ import 
org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
 import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
 import org.apache.paimon.tag.TagAutoManager;
 import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.BranchMergeHandler;
 import org.apache.paimon.utils.CatalogBranchManager;
 import org.apache.paimon.utils.ChangelogManager;
 import org.apache.paimon.utils.DVMetaCache;
@@ -737,6 +738,11 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
         branchManager().fastForward(branchName);
     }
 
+    @Override
+    public void mergeBranch(String sourceBranch, String targetBranch) {
+        branchManager().mergeBranch(sourceBranch, targetBranch);
+    }
+
     @Override
     public TagManager tagManager() {
         return new TagManager(fileIO, path, currentBranch(), coreOptions());
@@ -749,7 +755,12 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
             return new 
CatalogBranchManager(catalogEnvironment.catalogLoader(), identifier());
         }
         return new FileSystemBranchManager(
-                fileIO, path, snapshotManager(), tagManager(), 
schemaManager());
+                fileIO,
+                path,
+                snapshotManager(),
+                tagManager(),
+                schemaManager(),
+                new BranchMergeHandler(this::switchToBranch));
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
index 1f234d81ff..7ba4bc20a9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
@@ -295,6 +295,11 @@ public abstract class DelegatedFileStoreTable implements 
FileStoreTable {
         wrapped.fastForward(branchName);
     }
 
+    @Override
+    public void mergeBranch(String sourceBranch, String targetBranch) {
+        wrapped.mergeBranch(sourceBranch, targetBranch);
+    }
+
     @Override
     public ExpireSnapshots newExpireSnapshots() {
         return wrapped.newExpireSnapshots();
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
index 7f13faf753..aee60f65ee 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
@@ -427,6 +427,11 @@ public interface FormatTable extends Table {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    default void mergeBranch(String sourceBranch, String targetBranch) {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     default ExpireSnapshots newExpireSnapshots() {
         throw new UnsupportedOperationException();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
index b685f86e3e..8d7add5bc5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
@@ -281,6 +281,14 @@ public interface ReadonlyTable extends InnerTable {
                         this.getClass().getSimpleName()));
     }
 
+    @Override
+    default void mergeBranch(String sourceBranch, String targetBranch) {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "Readonly Table %s does not support mergeBranch.",
+                        this.getClass().getSimpleName()));
+    }
+
     @Override
     default ExpireSnapshots newExpireSnapshots() {
         throw new UnsupportedOperationException(
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java 
b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
index 98c00c0101..f58259b1fc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
@@ -210,6 +210,10 @@ public interface Table extends Serializable {
     @Experimental
     void fastForward(String branchName);
 
+    /** Merge source branch into target branch (append-only tables only). */
+    @Experimental
+    void mergeBranch(String sourceBranch, String targetBranch);
+
     /** Manually expire snapshots, parameters can be controlled independently 
of table options. */
     @Experimental
     ExpireSnapshots newExpireSnapshots();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
index c830a799bf..085f04747c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
@@ -59,6 +59,8 @@ public interface BranchManager {
 
     void fastForward(String branchName);
 
+    void mergeBranch(String sourceBranch, String targetBranch);
+
     void renameBranch(String fromBranch, String toBranch);
 
     List<String> branches();
@@ -98,6 +100,21 @@ public interface BranchManager {
                 branchName);
     }
 
+    static void mergeValidate(String sourceBranch, String targetBranch) {
+        checkArgument(
+                !StringUtils.isNullOrWhitespaceOnly(sourceBranch),
+                "Source branch name '%s' is blank.",
+                sourceBranch);
+        checkArgument(
+                !StringUtils.isNullOrWhitespaceOnly(targetBranch),
+                "Target branch name '%s' is blank.",
+                targetBranch);
+        checkArgument(
+                !sourceBranch.equals(targetBranch),
+                "Cannot merge branch '%s' into itself.",
+                sourceBranch);
+    }
+
     static void fastForwardValidate(String branchName, String currentBranch) {
         checkArgument(
                 !branchName.equals(DEFAULT_MAIN_BRANCH),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/BranchMergeHandler.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/BranchMergeHandler.java
new file mode 100644
index 0000000000..0327fd0bcf
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchMergeHandler.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.manifest.FileEntry;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.operation.FileStoreCommit;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.function.Function;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Branch merge handler backed by {@link FileStoreTable}. */
+public class BranchMergeHandler {
+
+    private final Function<String, FileStoreTable> branchTableFactory;
+
+    public BranchMergeHandler(Function<String, FileStoreTable> 
branchTableFactory) {
+        this.branchTableFactory = branchTableFactory;
+    }
+
+    public Map<FileEntry.Identifier, ManifestEntry> readBranchFiles(String 
branch) {
+        FileStoreTable branchTable = branchTableFactory.apply(branch);
+        Snapshot snapshot = branchTable.snapshotManager().latestSnapshot();
+        checkArgument(
+                snapshot != null,
+                "Cannot read branch '%s', because it does not have any 
snapshot.",
+                branch);
+        ManifestList manifestList = 
branchTable.store().manifestListFactory().create();
+        ManifestFile manifestFile = 
branchTable.store().manifestFileFactory().create();
+        Map<FileEntry.Identifier, ManifestEntry> files = new LinkedHashMap<>();
+        FileEntry.mergeEntries(manifestFile, 
manifestList.readDataManifests(snapshot), files, null);
+        return files;
+    }
+
+    public void commit(String targetBranch, List<ManifestEntry> filesToMerge) {
+        FileStoreTable branchTable = branchTableFactory.apply(targetBranch);
+        boolean rowTrackingEnabled =
+                new 
CoreOptions(branchTable.schema().options()).rowTrackingEnabled();
+
+        Map<MergeKey, List<DataFileMeta>> grouped = new LinkedHashMap<>();
+        for (ManifestEntry entry : filesToMerge) {
+            DataFileMeta file = prepareFileForTargetCommit(entry.file(), 
rowTrackingEnabled);
+            grouped.computeIfAbsent(
+                            new MergeKey(
+                                    entry.partition().copy(), entry.bucket(), 
entry.totalBuckets()),
+                            k -> new ArrayList<>())
+                    .add(file);
+        }
+
+        String commitUser = UUID.randomUUID().toString();
+        ManifestCommittable committable = new ManifestCommittable(0);
+        for (Map.Entry<MergeKey, List<DataFileMeta>> e : grouped.entrySet()) {
+            MergeKey key = e.getKey();
+            CommitMessageImpl message =
+                    new CommitMessageImpl(
+                            key.partition,
+                            key.bucket,
+                            key.totalBuckets,
+                            new DataIncrement(
+                                    e.getValue(), Collections.emptyList(), 
Collections.emptyList()),
+                            CompactIncrement.emptyIncrement());
+            committable.addFileCommittable(message);
+        }
+
+        try (FileStoreCommit commit = 
branchTable.store().newCommit(commitUser, branchTable)) {
+            commit.appendCommitCheckConflict(true).commit(committable, true);
+        }
+    }
+
+    private DataFileMeta prepareFileForTargetCommit(DataFileMeta file, boolean 
rowTrackingEnabled) {
+        if (rowTrackingEnabled && file.firstRowId() != null) {
+            // Source files already have row ids assigned in their branch. 
Clear them so the
+            // target branch commit path assigns fresh, non-overlapping row 
ids.
+            return file.newFirstRowId(null);
+        }
+        return file;
+    }
+
+    private static class MergeKey {
+        final BinaryRow partition;
+        final int bucket;
+        final int totalBuckets;
+
+        MergeKey(BinaryRow partition, int bucket, int totalBuckets) {
+            this.partition = partition;
+            this.bucket = bucket;
+            this.totalBuckets = totalBuckets;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof MergeKey)) {
+                return false;
+            }
+            MergeKey that = (MergeKey) o;
+            return bucket == that.bucket
+                    && totalBuckets == that.totalBuckets
+                    && Objects.equals(partition, that.partition);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(partition, bucket, totalBuckets);
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java
index 4ba9c39475..9f97cf3a09 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java
@@ -108,6 +108,11 @@ public class CatalogBranchManager implements BranchManager 
{
                 });
     }
 
+    @Override
+    public void mergeBranch(String sourceBranch, String targetBranch) {
+        throw new UnsupportedOperationException("Branch merge is not supported 
via catalog.");
+    }
+
     @Override
     public void renameBranch(String fromBranch, String toBranch) {
         executePost(catalog -> catalog.renameBranch(identifier, fromBranch, 
toBranch));
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java
index 9c09637d1c..aee1787850 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java
@@ -18,9 +18,13 @@
 
 package org.apache.paimon.utils;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.FileEntry;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.tag.Tag;
@@ -29,8 +33,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -47,18 +54,21 @@ public class FileSystemBranchManager implements 
BranchManager {
     private final SnapshotManager snapshotManager;
     private final TagManager tagManager;
     private final SchemaManager schemaManager;
+    private final BranchMergeHandler mergeHandler;
 
     public FileSystemBranchManager(
             FileIO fileIO,
             Path path,
             SnapshotManager snapshotManager,
             TagManager tagManager,
-            SchemaManager schemaManager) {
+            SchemaManager schemaManager,
+            BranchMergeHandler mergeHandler) {
         this.fileIO = fileIO;
         this.tablePath = path;
         this.snapshotManager = snapshotManager;
         this.tagManager = tagManager;
         this.schemaManager = schemaManager;
+        this.mergeHandler = mergeHandler;
     }
 
     /** Return the root Directory of branch. */
@@ -210,6 +220,197 @@ public class FileSystemBranchManager implements 
BranchManager {
         }
     }
 
+    @Override
+    public void mergeBranch(String sourceBranch, String targetBranch) {
+        BranchManager.mergeValidate(sourceBranch, targetBranch);
+        validateMergeBranches(sourceBranch, targetBranch);
+        validateAppendOnlyHistory(sourceBranch, targetBranch);
+        validateAppendOnly(sourceBranch, targetBranch);
+        validateNoDataEvolution(sourceBranch, targetBranch);
+        validateRowTrackingConsistent(sourceBranch, targetBranch);
+        validateLatestSchema(sourceBranch, targetBranch);
+
+        List<ManifestEntry> filesToMerge = computeMergeDiff(sourceBranch, 
targetBranch);
+        if (filesToMerge.isEmpty()) {
+            return;
+        }
+
+        validateMergeFileSchemas(sourceBranch, targetBranch, filesToMerge);
+        mergeHandler.commit(targetBranch, filesToMerge);
+    }
+
+    private void validateMergeBranches(String sourceBranch, String 
targetBranch) {
+        if (!BranchManager.isMainBranch(sourceBranch)) {
+            checkArgument(branchExists(sourceBranch), "Branch '%s' doesn't 
exist.", sourceBranch);
+        }
+        if (!BranchManager.isMainBranch(targetBranch)) {
+            checkArgument(branchExists(targetBranch), "Branch '%s' doesn't 
exist.", targetBranch);
+        }
+
+        SnapshotManager sourceSm = 
snapshotManager.copyWithBranch(sourceBranch);
+        checkArgument(
+                sourceSm.latestSnapshotId() != null,
+                "Cannot merge branch '%s', because it does not have snapshot.",
+                sourceBranch);
+
+        SnapshotManager targetSm = 
snapshotManager.copyWithBranch(targetBranch);
+        checkArgument(
+                targetSm.latestSnapshotId() != null,
+                "Cannot merge into branch '%s', because it does not have 
snapshot.",
+                targetBranch);
+    }
+
+    private void validateLatestSchema(String sourceBranch, String 
targetBranch) {
+        SchemaManager sourceSchemaMgr = new SchemaManager(fileIO, tablePath, 
sourceBranch);
+        SchemaManager targetSchemaMgr = new SchemaManager(fileIO, tablePath, 
targetBranch);
+        TableSchema sourceSchema = sourceSchemaMgr.latest().get();
+        TableSchema targetSchema = targetSchemaMgr.latest().get();
+        checkArgument(
+                sourceSchema.fields().equals(targetSchema.fields()),
+                "Cannot merge branch '%s' into '%s', schema mismatch.",
+                sourceBranch,
+                targetBranch);
+    }
+
+    private void validateMergeFileSchemas(
+            String sourceBranch, String targetBranch, List<ManifestEntry> 
filesToMerge) {
+        SchemaManager sourceSchemaMgr = new SchemaManager(fileIO, tablePath, 
sourceBranch);
+        SchemaManager targetSchemaMgr = new SchemaManager(fileIO, tablePath, 
targetBranch);
+
+        for (Long schemaId :
+                filesToMerge.stream()
+                        .map(entry -> entry.file().schemaId())
+                        .collect(Collectors.toSet())) {
+            checkArgument(
+                    targetSchemaMgr.schemaExists(schemaId),
+                    "Cannot merge branch '%s' into '%s', because target branch 
does not contain "
+                            + "schema id %s required by source files.",
+                    sourceBranch,
+                    targetBranch,
+                    schemaId);
+
+            TableSchema sourceSchema = sourceSchemaMgr.schema(schemaId);
+            TableSchema targetSchema = targetSchemaMgr.schema(schemaId);
+            checkArgument(
+                    sourceSchema.equals(targetSchema),
+                    "Cannot merge branch '%s' into '%s', schema history 
mismatch for schema id %s.",
+                    sourceBranch,
+                    targetBranch,
+                    schemaId);
+        }
+    }
+
+    private void validateAppendOnly(String sourceBranch, String targetBranch) {
+        SchemaManager sourceSchemaMgr = new SchemaManager(fileIO, tablePath, 
sourceBranch);
+        TableSchema sourceSchema = sourceSchemaMgr.latest().get();
+        checkArgument(
+                sourceSchema.primaryKeys().isEmpty(),
+                "Branch merge is only supported for append-only tables, "
+                        + "but branch '%s' has primary keys.",
+                sourceBranch);
+
+        SchemaManager targetSchemaMgr = new SchemaManager(fileIO, tablePath, 
targetBranch);
+        TableSchema targetSchema = targetSchemaMgr.latest().get();
+        checkArgument(
+                targetSchema.primaryKeys().isEmpty(),
+                "Branch merge is only supported for append-only tables, "
+                        + "but branch '%s' has primary keys.",
+                targetBranch);
+    }
+
+    // Branch merge is implemented as a conservative file-level merge. Without 
persisted branch
+    // lineage metadata, we cannot reliably infer a fork point after snapshots 
expire. To preserve
+    // correctness, both branches must retain complete append-only history 
from the first snapshot.
+    // This restriction can be relaxed in the future if branch fork metadata 
is introduced.
+    private void validateAppendOnlyHistory(String sourceBranch, String 
targetBranch) {
+        
validateCompleteAppendOnly(snapshotManager.copyWithBranch(sourceBranch), 
sourceBranch);
+        
validateCompleteAppendOnly(snapshotManager.copyWithBranch(targetBranch), 
targetBranch);
+    }
+
+    private void validateCompleteAppendOnly(SnapshotManager sm, String branch) 
{
+        Long earliest = sm.earliestSnapshotId();
+        Long latest = sm.latestSnapshotId();
+        if (earliest == null || latest == null) {
+            return;
+        }
+        if (earliest != Snapshot.FIRST_SNAPSHOT_ID) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Cannot merge: branch '%s' does not start at 
snapshot %d "
+                                    + "(earliest is %d). "
+                                    + "Branch merge requires complete 
append-only snapshot history.",
+                            branch, Snapshot.FIRST_SNAPSHOT_ID, earliest));
+        }
+        for (long id = Snapshot.FIRST_SNAPSHOT_ID; id <= latest; id++) {
+            if (!sm.snapshotExists(id)) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Cannot merge: snapshot %d is missing on 
branch '%s'. "
+                                        + "Branch merge requires complete 
append-only snapshot history.",
+                                id, branch));
+            }
+            Snapshot snapshot = sm.snapshot(id);
+            Snapshot.CommitKind kind = snapshot.commitKind();
+            if (kind != Snapshot.CommitKind.APPEND && kind != 
Snapshot.CommitKind.ANALYZE) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Cannot merge: snapshot %d on branch '%s' has 
commit kind '%s'. "
+                                        + "Branch merge requires complete 
append-only snapshot history.",
+                                id, branch, kind));
+            }
+        }
+    }
+
+    private void validateNoDataEvolution(String sourceBranch, String 
targetBranch) {
+        for (String branch : new String[] {sourceBranch, targetBranch}) {
+            SchemaManager sm = new SchemaManager(fileIO, tablePath, branch);
+            TableSchema schema = sm.latest().get();
+            CoreOptions opts = new CoreOptions(schema.options());
+            checkArgument(
+                    !opts.dataEvolutionEnabled(),
+                    "Branch merge is not supported for data-evolution tables 
(branch '%s').",
+                    branch);
+        }
+    }
+
+    private void validateRowTrackingConsistent(String sourceBranch, String 
targetBranch) {
+        boolean sourceEnabled = isRowTrackingEnabled(sourceBranch);
+        boolean targetEnabled = isRowTrackingEnabled(targetBranch);
+        checkArgument(
+                sourceEnabled == targetEnabled,
+                "Cannot merge branch '%s' into '%s': row-tracking settings 
must match "
+                        + "(source=%s, target=%s).",
+                sourceBranch,
+                targetBranch,
+                sourceEnabled,
+                targetEnabled);
+    }
+
+    private boolean isRowTrackingEnabled(String branch) {
+        SchemaManager schemaManager = new SchemaManager(fileIO, tablePath, 
branch);
+        TableSchema schema = schemaManager.latest().get();
+        return new CoreOptions(schema.options()).rowTrackingEnabled();
+    }
+
+    private List<ManifestEntry> computeMergeDiff(String sourceBranch, String 
targetBranch) {
+        Set<FileEntry.Identifier> targetFileIds =
+                mergeHandler.readBranchFiles(targetBranch).keySet();
+
+        Map<FileEntry.Identifier, ManifestEntry> sourceFiles =
+                mergeHandler.readBranchFiles(sourceBranch);
+
+        List<ManifestEntry> filesToMerge = new ArrayList<>();
+        for (Map.Entry<FileEntry.Identifier, ManifestEntry> entry : 
sourceFiles.entrySet()) {
+            if (!targetFileIds.contains(entry.getKey())) {
+                ManifestEntry manifestEntry = entry.getValue();
+                if (manifestEntry.kind() == FileKind.ADD) {
+                    filesToMerge.add(manifestEntry);
+                }
+            }
+        }
+        return filesToMerge;
+    }
+
     /** Check if a branch exists. */
     public boolean branchExists(String branchName) {
         Path branchPath = branchPath(branchName);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
index 30ccc652a0..c91dbb4c8e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.table;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.append.AppendCompactTask;
 import org.apache.paimon.bucket.DefaultBucketFunction;
 import org.apache.paimon.data.BinaryRow;
@@ -37,6 +38,8 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.BundleRecords;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileEntry;
+import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.ManifestList;
@@ -73,6 +76,7 @@ import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.BranchMergeHandler;
 import org.apache.paimon.utils.RoaringBitmap32;
 
 import org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetOutputFormat;
@@ -86,8 +90,10 @@ import org.junit.jupiter.params.provider.ValueSource;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -117,6 +123,7 @@ import static org.apache.paimon.CoreOptions.WRITE_ONLY;
 import static org.apache.paimon.io.DataFileTestUtils.row;
 import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST;
 import static org.apache.paimon.predicate.SortValue.SortDirection.DESCENDING;
+import static 
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -1699,4 +1706,1101 @@ public class AppendOnlySimpleTableTest extends 
SimpleTableTestBase {
                                 ""));
         return new AppendOnlyFileStoreTable(FileIOFinder.find(tablePath), 
tablePath, tableSchema);
     }
+
+    protected FileStoreTable createBranchMergeTable() throws Exception {
+        return createFileStoreTable();
+    }
+
+    protected FileStoreTable createBranchMergeTable(Consumer<Options> 
extraConfigure)
+            throws Exception {
+        return createFileStoreTable(extraConfigure);
+    }
+
+    @Test
+    public void testMergeBranch() throws Exception {
+        FileStoreTable table = createBranchMergeTable();
+
+        // Write data to main
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(0, 0, 0L));
+            commit.commit(0, write.prepareCommit(false, 1));
+        }
+
+        // Create branch from tag
+        table.createTag("tag1", 1);
+        table.createBranch(BRANCH_NAME, "tag1");
+        FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+        // Write data to branch
+        try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+                StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+            write.write(rowData(1, 10, 100L));
+            commit.commit(1, write.prepareCommit(false, 2));
+        }
+
+        // Write more data to main
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(2, 20, 200L));
+            commit.commit(2, write.prepareCommit(false, 2));
+        }
+
+        // Merge branch into main
+        table.mergeBranch(BRANCH_NAME, "main");
+
+        // Verify main has data from both sides
+        assertThat(
+                        getResult(
+                                table.newRead(),
+                                
toSplits(table.newSnapshotReader().read().dataSplits()),
+                                BATCH_ROW_TO_STRING))
+                .containsExactlyInAnyOrder(
+                        "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+                        "1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+                        "2|20|200|binary|varbinary|mapKey:mapVal|multiset");
+    }
+
+    @Test
+    public void testMergeBranchMultipleTimes() throws Exception {
+        FileStoreTable table = createBranchMergeTable();
+
+        // Write data to main
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(0, 0, 0L));
+            commit.commit(0, write.prepareCommit(false, 1));
+        }
+
+        // Create branch from tag
+        table.createTag("tag1", 1);
+        table.createBranch(BRANCH_NAME, "tag1");
+        FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+        // First write to branch
+        try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+                StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+            write.write(rowData(1, 10, 100L));
+            commit.commit(1, write.prepareCommit(false, 2));
+        }
+
+        // First merge
+        table.mergeBranch(BRANCH_NAME, "main");
+
+        // Second write to branch
+        try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+                StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+            write.write(rowData(2, 20, 200L));
+            commit.commit(2, write.prepareCommit(false, 3));
+        }
+
+        // Second merge
+        table.mergeBranch(BRANCH_NAME, "main");
+
+        // Verify no duplicates: main has all 3 rows exactly once
+        assertThat(
+                        getResult(
+                                table.newRead(),
+                                
toSplits(table.newSnapshotReader().read().dataSplits()),
+                                BATCH_ROW_TO_STRING))
+                .containsExactlyInAnyOrder(
+                        "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+                        "1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+                        "2|20|200|binary|varbinary|mapKey:mapVal|multiset");
+    }
+
+    @Test
+    public void testMergeBranchFailsOnStaleDuplicateCommit() throws Exception {
+        FileStoreTable table = createBranchMergeTable();
+
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(0, 0, 0L));
+            commit.commit(0, write.prepareCommit(false, 1));
+        }
+
+        table.createBranch(BRANCH_NAME);
+        FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+        try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+                StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+            write.write(rowData(1, 10, 100L));
+            commit.commit(1, write.prepareCommit(false, 2));
+        }
+
+        BranchMergeHandler handler = new 
BranchMergeHandler(table::switchToBranch);
+        Map<org.apache.paimon.manifest.FileEntry.Identifier, ManifestEntry> 
sourceFiles =
+                handler.readBranchFiles(BRANCH_NAME);
+        Map<org.apache.paimon.manifest.FileEntry.Identifier, ManifestEntry> 
targetFiles =
+                handler.readBranchFiles("main");
+        List<ManifestEntry> filesToMerge =
+                sourceFiles.entrySet().stream()
+                        .filter(entry -> 
!targetFiles.containsKey(entry.getKey()))
+                        .map(Map.Entry::getValue)
+                        .collect(Collectors.toList());
+
+        handler.commit("main", filesToMerge);
+        assertThatThrownBy(() -> handler.commit("main", filesToMerge))
+                .satisfies(anyCauseMatches(RuntimeException.class, "Trying to 
add file"));
+    }
+
+    @Test
+    public void testMergeBranchBidirectional() throws Exception {
+        FileStoreTable table = createBranchMergeTable();
+
+        // Write shared data to main
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(0, 0, 0L));
+            commit.commit(0, write.prepareCommit(false, 1));
+        }
+
+        // Create branch from tag
+        table.createTag("tag1", 1);
+        table.createBranch(BRANCH_NAME, "tag1");
+        FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+        // Write to branch
+        try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+                StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+            write.write(rowData(1, 10, 100L));
+            commit.commit(1, write.prepareCommit(false, 2));
+        }
+
+        // Write to main
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(2, 20, 200L));
+            commit.commit(2, write.prepareCommit(false, 2));
+        }
+
+        // Merge branch -> main
+        table.mergeBranch(BRANCH_NAME, "main");
+
+        // Merge main -> branch
+        table.mergeBranch("main", BRANCH_NAME);
+
+        // Verify both have the same data without duplicates
+        List<String> mainData =
+                getResult(
+                        table.newRead(),
+                        
toSplits(table.newSnapshotReader().read().dataSplits()),
+                        BATCH_ROW_TO_STRING);
+        List<String> branchData =
+                getResult(
+                        tableBranch.newRead(),
+                        
toSplits(tableBranch.newSnapshotReader().read().dataSplits()),
+                        BATCH_ROW_TO_STRING);
+
+        assertThat(mainData)
+                .containsExactlyInAnyOrder(
+                        "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+                        "1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+                        "2|20|200|binary|varbinary|mapKey:mapVal|multiset");
+        assertThat(branchData).containsExactlyInAnyOrderElementsOf(mainData);
+    }
+
+    @Test
+    public void testMergeBranchEmptyDiff() throws Exception {
+        FileStoreTable table = createBranchMergeTable();
+
+        // Write data to main
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(0, 0, 0L));
+            commit.commit(0, write.prepareCommit(false, 1));
+        }
+
+        // Create branch from tag (has same data as main)
+        table.createTag("tag1", 1);
+        table.createBranch(BRANCH_NAME, "tag1");
+
+        // Merge should be a no-op (no exception, no new snapshot)
+        long snapshotBefore = table.snapshotManager().latestSnapshotId();
+        table.mergeBranch(BRANCH_NAME, "main");
+        long snapshotAfter = table.snapshotManager().latestSnapshotId();
+        assertThat(snapshotAfter).isEqualTo(snapshotBefore);
+    }
+
+    @Test
+    public void testMergeBranchSchemaConflict() throws Exception {
+        FileStoreTable table = createBranchMergeTable();
+
+        // Write data to main
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(0, 0, 0L));
+            commit.commit(0, write.prepareCommit(false, 1));
+        }
+
+        // Create branch from tag
+        table.createTag("tag1", 1);
+        table.createBranch(BRANCH_NAME, "tag1");
+
+        // Modify schema on main (add a column)
+        SchemaManager schemaManager = new SchemaManager(table.fileIO(), 
table.location());
+        schemaManager.commitChanges(SchemaChange.addColumn("new_col", 
DataTypes.INT()));
+
+        // Merge should fail due to schema mismatch
+        assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main"))
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "Cannot merge branch 'branch1' into 'main', 
schema mismatch."));
+    }
+
+    @Test
+    public void testMergeBranchSchemaHistoryConflict() throws Exception {
+        FileStoreTable table = createBranchMergeTable();
+
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(0, 0, 0L));
+            commit.commit(0, write.prepareCommit(false, 1));
+        }
+
+        table.createTag("tag1", 1);
+        table.createBranch(BRANCH_NAME, "tag1");
+
+        SchemaManager branchSchemaManager =
+                new SchemaManager(table.fileIO(), table.location(), 
BRANCH_NAME);
+        branchSchemaManager.commitChanges(SchemaChange.addColumn("source_col", 
DataTypes.INT()));
+        FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+        try (BatchTableWrite write =
+                        
tableBranch.newBatchWriteBuilder().newWrite().withWriteType(ROW_TYPE);
+                BatchTableCommit commit = 
tableBranch.newBatchWriteBuilder().newCommit()) {
+            write.write(rowData(1, 10, 100L));
+            commit.commit(write.prepareCommit());
+        }
+        branchSchemaManager.commitChanges(
+                
Collections.singletonList(SchemaChange.dropColumn("source_col")));
+
+        SchemaManager mainSchemaManager = new SchemaManager(table.fileIO(), 
table.location());
+        mainSchemaManager.commitChanges(SchemaChange.addColumn("target_col", 
DataTypes.INT()));
+        mainSchemaManager.commitChanges(
+                
Collections.singletonList(SchemaChange.dropColumn("target_col")));
+
+        assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main"))
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "schema history mismatch for schema id 1"));
+    }
+
+    @Test
+    public void testMergeBranchRowTrackingTable() throws Exception {
+        FileStoreTable table =
+                createBranchMergeTable(
+                        options -> 
options.set(CoreOptions.ROW_TRACKING_ENABLED, true));
+
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(0, 0, 0L));
+            commit.commit(0, write.prepareCommit(false, 1));
+        }
+
+        table.createTag("tag1", 1);
+        table.createBranch(BRANCH_NAME, "tag1");
+        FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+        try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+                StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+            write.write(rowData(1, 10, 100L));
+            commit.commit(1, write.prepareCommit(false, 2));
+        }
+
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(2, 20, 200L));
+            commit.commit(2, write.prepareCommit(false, 2));
+        }
+
+        table.mergeBranch(BRANCH_NAME, "main");
+
+        assertThat(
+                        getResult(
+                                table.newRead(),
+                                
toSplits(table.newSnapshotReader().read().dataSplits()),
+                                BATCH_ROW_TO_STRING))
+                .containsExactlyInAnyOrder(
+                        "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+                        "1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+                        "2|20|200|binary|varbinary|mapKey:mapVal|multiset");
+
+        assertRowIdRangesNonOverlapping(table);
+        Snapshot snapshot = table.snapshotManager().latestSnapshot();
+        assertThat(snapshot.nextRowId()).isEqualTo(3L);
+    }
+
+    @Test
+    public void testMergeBranchRowTrackingMultipleTimes() throws Exception {
+        FileStoreTable table =
+                createBranchMergeTable(
+                        options -> 
options.set(CoreOptions.ROW_TRACKING_ENABLED, true));
+
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(0, 0, 0L));
+            commit.commit(0, write.prepareCommit(false, 1));
+        }
+
+        table.createTag("tag1", 1);
+        table.createBranch(BRANCH_NAME, "tag1");
+        FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+        // First write to branch + merge
+        try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+                StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+            write.write(rowData(1, 10, 100L));
+            commit.commit(1, write.prepareCommit(false, 2));
+        }
+        table.mergeBranch(BRANCH_NAME, "main");
+
+        // Second write to branch + merge
+        try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+                StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+            write.write(rowData(2, 20, 200L));
+            commit.commit(2, write.prepareCommit(false, 3));
+        }
+        table.mergeBranch(BRANCH_NAME, "main");
+
+        assertThat(
+                        getResult(
+                                table.newRead(),
+                                
toSplits(table.newSnapshotReader().read().dataSplits()),
+                                BATCH_ROW_TO_STRING))
+                .containsExactlyInAnyOrder(
+                        "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+                        "1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+                        "2|20|200|binary|varbinary|mapKey:mapVal|multiset");
+
+        assertRowIdRangesNonOverlapping(table);
+        Snapshot snapshot = table.snapshotManager().latestSnapshot();
+        assertThat(snapshot.nextRowId()).isEqualTo(3L);
+    }
+
+    @Test
+    public void testMergeBranchRowTrackingAfterTargetWrites() throws Exception 
{
+        FileStoreTable table =
+                createBranchMergeTable(
+                        options -> 
options.set(CoreOptions.ROW_TRACKING_ENABLED, true));
+
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(0, 0, 0L));
+            commit.commit(0, write.prepareCommit(false, 1));
+        }
+
+        table.createTag("tag1", 1);
+        table.createBranch(BRANCH_NAME, "tag1");
+        FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+        // Write 2 rows to branch
+        try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+                StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+            write.write(rowData(1, 10, 100L));
+            write.write(rowData(1, 11, 101L));
+            commit.commit(1, write.prepareCommit(false, 2));
+        }
+
+        // Write 3 rows to main independently (advances main nextRowId)
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(2, 20, 200L));
+            write.write(rowData(2, 21, 201L));
+            write.write(rowData(2, 22, 202L));
+            commit.commit(2, write.prepareCommit(false, 2));
+        }
+
+        table.mergeBranch(BRANCH_NAME, "main");
+
+        assertThat(
+                        getResult(
+                                table.newRead(),
+                                
toSplits(table.newSnapshotReader().read().dataSplits()),
+                                BATCH_ROW_TO_STRING))
+                .hasSize(6);
+
+        assertRowIdRangesNonOverlapping(table);
+        Snapshot snapshot = table.snapshotManager().latestSnapshot();
+        assertThat(snapshot.nextRowId()).isEqualTo(6L);
+    }
+
+    @Test
+    public void testMergeBranchRowTrackingBetweenNonMainBranches() throws 
Exception {
+        FileStoreTable table =
+                createBranchMergeTable(
+                        options -> 
options.set(CoreOptions.ROW_TRACKING_ENABLED, true));
+
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(0, 0, 0L));
+            commit.commit(0, write.prepareCommit(false, 1));
+        }
+
+        table.createTag("tag1", 1);
+        table.createBranch("branchA", "tag1");
+        table.createBranch("branchB", "tag1");
+        FileStoreTable tableA = table.switchToBranch("branchA");
+        FileStoreTable tableB = table.switchToBranch("branchB");
+
+        try (StreamTableWrite write = tableA.newWrite(commitUser);
+                StreamTableCommit commit = tableA.newCommit(commitUser)) {
+            write.write(rowData(1, 10, 100L));
+            commit.commit(1, write.prepareCommit(false, 2));
+        }
+
+        try (StreamTableWrite write = tableB.newWrite(commitUser);
+                StreamTableCommit commit = tableB.newCommit(commitUser)) {
+            write.write(rowData(2, 20, 200L));
+            commit.commit(2, write.prepareCommit(false, 2));
+        }
+
+        table.mergeBranch("branchA", "branchB");
+
+        tableB = table.switchToBranch("branchB");
+        assertThat(
+                        getResult(
+                                tableB.newRead(),
+                                
toSplits(tableB.newSnapshotReader().read().dataSplits()),
+                                BATCH_ROW_TO_STRING))
+                .containsExactlyInAnyOrder(
+                        "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+                        "1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+                        "2|20|200|binary|varbinary|mapKey:mapVal|multiset");
+
+        assertRowIdRangesNonOverlapping(tableB);
+        Snapshot snapshot = tableB.snapshotManager().latestSnapshot();
+        assertThat(snapshot.nextRowId()).isEqualTo(3L);
+    }
+
+    @Test
+    public void testMergeBranchRowTrackingMismatch() throws Exception {
+        FileStoreTable table =
+                createBranchMergeTable(
+                        options -> 
options.set(CoreOptions.ROW_TRACKING_ENABLED, true));
+
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(0, 0, 0L));
+            commit.commit(0, write.prepareCommit(false, 1));
+        }
+
+        table.createTag("tag1", 1);
+        table.createBranch(BRANCH_NAME, "tag1");
+        FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+        try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+                StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+            write.write(rowData(1, 10, 100L));
+            commit.commit(1, write.prepareCommit(false, 2));
+        }
+
+        // Directly write a new schema to the branch with row-tracking disabled
+        SchemaManager branchSchemaManager =
+                new SchemaManager(table.fileIO(), table.location(), 
BRANCH_NAME);
+        TableSchema branchSchema = branchSchemaManager.latest().get();
+        Map<String, String> newOptions = new HashMap<>(branchSchema.options());
+        newOptions.remove("row-tracking.enabled");
+        TableSchema mismatchedSchema =
+                new TableSchema(
+                        branchSchema.version(),
+                        branchSchema.id() + 1,
+                        branchSchema.fields(),
+                        branchSchema.highestFieldId(),
+                        branchSchema.partitionKeys(),
+                        branchSchema.primaryKeys(),
+                        newOptions,
+                        branchSchema.comment(),
+                        branchSchema.timeMillis());
+        branchSchemaManager.commit(mismatchedSchema);
+
+        assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main"))
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "row-tracking settings must match"));
+    }
+
+    @Test
+    public void testMergeBranchRowTrackingStaleMerge() throws Exception {
+        FileStoreTable table =
+                createBranchMergeTable(
+                        options -> 
options.set(CoreOptions.ROW_TRACKING_ENABLED, true));
+
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(0, 0, 0L));
+            commit.commit(0, write.prepareCommit(false, 1));
+        }
+
+        table.createTag("tag1", 1);
+        table.createBranch(BRANCH_NAME, "tag1");
+        FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+        // Write to branch
+        try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+                StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+            write.write(rowData(1, 10, 100L));
+            commit.commit(1, write.prepareCommit(false, 2));
+        }
+
+        // Write to main multiple times to advance nextRowId
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(2, 20, 200L));
+            commit.commit(2, write.prepareCommit(false, 2));
+        }
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(3, 30, 300L));
+            write.write(rowData(3, 31, 301L));
+            commit.commit(3, write.prepareCommit(false, 3));
+        }
+
+        // Merge: branch file should get firstRowId after all main files
+        table.mergeBranch(BRANCH_NAME, "main");
+
+        assertThat(
+                        getResult(
+                                table.newRead(),
+                                
toSplits(table.newSnapshotReader().read().dataSplits()),
+                                BATCH_ROW_TO_STRING))
+                .hasSize(5);
+
+        assertRowIdRangesNonOverlapping(table);
+        Snapshot snapshot = table.snapshotManager().latestSnapshot();
+        assertThat(snapshot.nextRowId()).isEqualTo(5L);
+
+        // Write more to main, then merge again (branch has no new data, 
should be no-op)
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(4, 40, 400L));
+            commit.commit(4, write.prepareCommit(false, 4));
+        }
+
+        long snapshotIdBefore = table.snapshotManager().latestSnapshotId();
+        table.mergeBranch(BRANCH_NAME, "main");
+        long snapshotIdAfter = table.snapshotManager().latestSnapshotId();
+
+        // Second merge should be no-op (branch file already in target)
+        assertThat(snapshotIdAfter).isEqualTo(snapshotIdBefore);
+
+        assertRowIdRangesNonOverlapping(table);
+        Snapshot finalSnapshot = table.snapshotManager().latestSnapshot();
+        assertThat(finalSnapshot.nextRowId()).isEqualTo(6L);
+    }
+
+    private void assertRowIdRangesNonOverlapping(FileStoreTable table) {
+        ManifestList manifestList = 
table.store().manifestListFactory().create();
+        ManifestFile manifestFile = 
table.store().manifestFileFactory().create();
+        Snapshot snapshot = table.snapshotManager().latestSnapshot();
+        Map<FileEntry.Identifier, ManifestEntry> files = new LinkedHashMap<>();
+        FileEntry.mergeEntries(manifestFile, 
manifestList.readDataManifests(snapshot), files, null);
+
+        List<long[]> ranges = new ArrayList<>();
+        for (ManifestEntry entry : files.values()) {
+            if (entry.file().firstRowId() != null) {
+                long start = entry.file().firstRowId();
+                long end = start + entry.file().rowCount() - 1;
+                ranges.add(new long[] {start, end});
+            }
+        }
+        ranges.sort(Comparator.comparingLong(r -> r[0]));
+        for (int i = 1; i < ranges.size(); i++) {
+            assertTrue(
+                    ranges.get(i)[0] > ranges.get(i - 1)[1],
+                    String.format(
+                            "Row-id ranges overlap: [%d, %d] and [%d, %d]",
+                            ranges.get(i - 1)[0],
+                            ranges.get(i - 1)[1],
+                            ranges.get(i)[0],
+                            ranges.get(i)[1]));
+        }
+    }
+
+    @Test
+    public void testMergeBranchSameBranch() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+
+        assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, BRANCH_NAME))
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "Cannot merge branch 'branch1' into itself."));
+    }
+
+    @Test
+    public void testMergeBranchSamePartition() throws Exception {
+        FileStoreTable table = createBranchMergeTable();
+
+        // Write data to main (partition pt=0)
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(0, 0, 0L));
+            commit.commit(0, write.prepareCommit(false, 1));
+        }
+
+        // Create branch from tag
+        table.createTag("tag1", 1);
+        table.createBranch(BRANCH_NAME, "tag1");
+        FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+        // Write to branch with same partition pt=0
+        try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+                StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+            write.write(rowData(0, 10, 100L));
+            commit.commit(1, write.prepareCommit(false, 2));
+        }
+
+        // Merge branch into main
+        table.mergeBranch(BRANCH_NAME, "main");
+
+        // Both files coexist in the same partition
+        assertThat(
+                        getResult(
+                                table.newRead(),
+                                
toSplits(table.newSnapshotReader().read().dataSplits()),
+                                BATCH_ROW_TO_STRING))
+                .containsExactlyInAnyOrder(
+                        "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+                        "0|10|100|binary|varbinary|mapKey:mapVal|multiset");
+    }
+
+    @Test
+    public void testMergeBranchNonExistentBranch() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(0, 0, 0L));
+            commit.commit(0, write.prepareCommit(false, 1));
+        }
+
+        assertThatThrownBy(() -> table.mergeBranch("nonexistent", "main"))
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "Branch 'nonexistent' doesn't exist."));
+    }
+
+    @Test
+    public void testMergeBranchMultiBucket() throws Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        conf -> {
+                            conf.set(CoreOptions.BUCKET, 2);
+                        });
+
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(0, 0, 0L));
+            write.write(rowData(0, 1, 1L));
+            commit.commit(0, write.prepareCommit(false, 1));
+        }
+
+        table.createTag("tag1", 1);
+        table.createBranch(BRANCH_NAME, "tag1");
+        FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+        try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+                StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+            write.write(rowData(1, 10, 100L));
+            write.write(rowData(1, 11, 110L));
+            commit.commit(1, write.prepareCommit(false, 2));
+        }
+
+        table.mergeBranch(BRANCH_NAME, "main");
+
+        assertThat(
+                        getResult(
+                                table.newRead(),
+                                
toSplits(table.newSnapshotReader().read().dataSplits()),
+                                BATCH_ROW_TO_STRING))
+                .containsExactlyInAnyOrder(
+                        "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+                        "0|1|1|binary|varbinary|mapKey:mapVal|multiset",
+                        "1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+                        "1|11|110|binary|varbinary|mapKey:mapVal|multiset");
+    }
+
+    @Test
+    public void testMergeBranchNonExistentTargetBranch() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(0, 0, 0L));
+            commit.commit(0, write.prepareCommit(false, 1));
+        }
+
+        table.createBranch(BRANCH_NAME);
+
+        assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "nonexistent"))
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "Branch 'nonexistent' doesn't exist."));
+    }
+
+    @Test
+    public void testMergeBranchBetweenNonMainBranches() throws Exception {
+        FileStoreTable table = createBranchMergeTable();
+
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(0, 0, 0L));
+            commit.commit(0, write.prepareCommit(false, 1));
+        }
+
+        // Create two branches from tag so they share the same base data
+        table.createTag("tag1", 1);
+        table.createBranch("branchA", "tag1");
+        table.createBranch("branchB", "tag1");
+        FileStoreTable tableA = table.switchToBranch("branchA");
+        FileStoreTable tableB = table.switchToBranch("branchB");
+
+        // Write to branchA
+        try (StreamTableWrite write = tableA.newWrite(commitUser);
+                StreamTableCommit commit = tableA.newCommit(commitUser)) {
+            write.write(rowData(1, 10, 100L));
+            commit.commit(1, write.prepareCommit(false, 2));
+        }
+
+        // Write to branchB
+        try (StreamTableWrite write = tableB.newWrite(commitUser);
+                StreamTableCommit commit = tableB.newCommit(commitUser)) {
+            write.write(rowData(2, 20, 200L));
+            commit.commit(2, write.prepareCommit(false, 2));
+        }
+
+        // Merge branchA into branchB
+        table.mergeBranch("branchA", "branchB");
+
+        // Reload branchB table to see changes
+        tableB = table.switchToBranch("branchB");
+        assertThat(
+                        getResult(
+                                tableB.newRead(),
+                                
toSplits(tableB.newSnapshotReader().read().dataSplits()),
+                                BATCH_ROW_TO_STRING))
+                .containsExactlyInAnyOrder(
+                        "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+                        "1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+                        "2|20|200|binary|varbinary|mapKey:mapVal|multiset");
+    }
+
+    @Test
+    public void testMergeBranchAfterSnapshotExpiration() throws Exception {
+        FileStoreTable table =
+                createBranchMergeTable(
+                        conf -> {
+                            conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 1);
+                            conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 1);
+                        });
+
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(0, 0, 0L));
+            commit.commit(0, write.prepareCommit(false, 1));
+        }
+
+        table.createTag("tag1", 1);
+        table.createBranch(BRANCH_NAME, "tag1");
+        FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+        try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+                StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+            write.write(rowData(1, 10, 100L));
+            commit.commit(1, write.prepareCommit(false, 2));
+        }
+
+        for (int i = 2; i < 5; i++) {
+            try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+                    StreamTableCommit commit = 
tableBranch.newCommit(commitUser)) {
+                write.write(rowData(i, i * 10, (long) i * 100));
+                commit.commit(i, write.prepareCommit(false, i + 1));
+            }
+        }
+
+        
tableBranch.newExpireSnapshots().config(tableBranch.coreOptions().expireConfig()).expire();
+
+        // After expiration, baseline snapshot is gone — merge should fail
+        assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main"))
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "Branch merge requires complete append-only 
snapshot history"));
+    }
+
+    @Test
+    public void testMergeBranchRejectsNonAppendHistory() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(0, 0, 0L));
+            commit.commit(0, write.prepareCommit(false, 1));
+        }
+
+        table.createTag("tag1", 1);
+        table.createBranch(BRANCH_NAME, "tag1");
+        FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+        // Write data to branch
+        try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+                StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+            write.write(rowData(0, 10, 100L));
+            commit.commit(1, write.prepareCommit(false, 2));
+        }
+
+        // Perform INSERT OVERWRITE on branch — creates an OVERWRITE snapshot
+        List<CommitMessage> commitMessages;
+        try (BatchTableWrite write =
+                tableBranch.newBatchWriteBuilder().withOverwrite().newWrite()) 
{
+            write.write(rowData(0, 20, 200L));
+            commitMessages = write.prepareCommit();
+        }
+        try (BatchTableCommit commit =
+                
tableBranch.newBatchWriteBuilder().withOverwrite().newCommit()) {
+            commit.commit(commitMessages);
+        }
+
+        assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main"))
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "Branch merge requires complete append-only 
snapshot history"));
+    }
+
+    @Test
+    public void testMergeBranchRejectsTargetOverwrite() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(0, 0, 0L));
+            commit.commit(0, write.prepareCommit(false, 1));
+        }
+
+        table.createTag("tag1", 1);
+        table.createBranch(BRANCH_NAME, "tag1");
+        FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+        try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+                StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+            write.write(rowData(0, 10, 100L));
+            commit.commit(1, write.prepareCommit(false, 2));
+        }
+
+        // INSERT OVERWRITE on target (main)
+        List<CommitMessage> commitMessages;
+        try (BatchTableWrite write = 
table.newBatchWriteBuilder().withOverwrite().newWrite()) {
+            write.write(rowData(0, 20, 200L));
+            commitMessages = write.prepareCommit();
+        }
+        try (BatchTableCommit commit = 
table.newBatchWriteBuilder().withOverwrite().newCommit()) {
+            commit.commit(commitMessages);
+        }
+
+        assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main"))
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "Branch merge requires complete append-only 
snapshot history"));
+    }
+
+    @Test
+    public void testMergeBranchRejectsSourceOverwrite() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(0, 0, 0L));
+            commit.commit(0, write.prepareCommit(false, 1));
+        }
+
+        table.createTag("tag1", 1);
+        table.createBranch(BRANCH_NAME, "tag1");
+        FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+        try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+                StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+            write.write(rowData(0, 10, 100L));
+            commit.commit(1, write.prepareCommit(false, 2));
+        }
+
+        // INSERT OVERWRITE on source (branch)
+        List<CommitMessage> commitMessages;
+        try (BatchTableWrite write =
+                tableBranch.newBatchWriteBuilder().withOverwrite().newWrite()) 
{
+            write.write(rowData(0, 20, 200L));
+            commitMessages = write.prepareCommit();
+        }
+        try (BatchTableCommit commit =
+                
tableBranch.newBatchWriteBuilder().withOverwrite().newCommit()) {
+            commit.commit(commitMessages);
+        }
+
+        assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main"))
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "Branch merge requires complete append-only 
snapshot history"));
+    }
+
+    @Test
+    public void testMergeBranchRejectsSourceExpiredSnapshots() throws 
Exception {
+        FileStoreTable table =
+                createBranchMergeTable(
+                        conf -> {
+                            conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 1);
+                            conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 1);
+                        });
+
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(0, 0, 0L));
+            commit.commit(0, write.prepareCommit(false, 1));
+        }
+
+        table.createTag("tag1", 1);
+        table.createBranch(BRANCH_NAME, "tag1");
+        FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+        for (int i = 1; i < 5; i++) {
+            try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+                    StreamTableCommit commit = 
tableBranch.newCommit(commitUser)) {
+                write.write(rowData(i, i * 10, (long) i * 100));
+                commit.commit(i, write.prepareCommit(false, i + 1));
+            }
+        }
+
+        
tableBranch.newExpireSnapshots().config(tableBranch.coreOptions().expireConfig()).expire();
+
+        assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main"))
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "Branch merge requires complete append-only 
snapshot history"));
+    }
+
+    @Test
+    public void testMergeBranchRejectsTargetExpiredSnapshots() throws 
Exception {
+        FileStoreTable table =
+                createBranchMergeTable(
+                        conf -> {
+                            conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 1);
+                            conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 1);
+                        });
+
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(0, 0, 0L));
+            commit.commit(0, write.prepareCommit(false, 1));
+        }
+
+        table.createTag("tag1", 1);
+        table.createBranch(BRANCH_NAME, "tag1");
+        FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+        try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+                StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+            write.write(rowData(1, 10, 100L));
+            commit.commit(1, write.prepareCommit(false, 2));
+        }
+
+        // Write multiple commits on main to allow expiration
+        for (int i = 2; i < 5; i++) {
+            try (StreamTableWrite write = table.newWrite(commitUser);
+                    StreamTableCommit commit = table.newCommit(commitUser)) {
+                write.write(rowData(i, i * 10, (long) i * 100));
+                commit.commit(i, write.prepareCommit(false, i + 1));
+            }
+        }
+
+        
table.newExpireSnapshots().config(table.coreOptions().expireConfig()).expire();
+
+        assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main"))
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "Branch merge requires complete append-only 
snapshot history"));
+    }
+
+    @Test
+    public void testMergeBranchFromTagSucceeds() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(0, 0, 0L));
+            commit.commit(0, write.prepareCommit(false, 1));
+        }
+
+        table.createTag("tag1", 1);
+        table.createBranch(BRANCH_NAME, "tag1");
+        FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+        try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+                StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+            write.write(rowData(1, 10, 100L));
+            commit.commit(1, write.prepareCommit(false, 2));
+        }
+
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(2, 20, 200L));
+            commit.commit(2, write.prepareCommit(false, 2));
+        }
+
+        table.mergeBranch(BRANCH_NAME, "main");
+
+        assertThat(
+                        getResult(
+                                table.newRead(),
+                                
toSplits(table.newSnapshotReader().read().dataSplits()),
+                                BATCH_ROW_TO_STRING))
+                .containsExactlyInAnyOrder(
+                        "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+                        "1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+                        "2|20|200|binary|varbinary|mapKey:mapVal|multiset");
+    }
+
+    @Test
+    public void testMergePlainBranchSucceedsWithCompleteHistory() throws 
Exception {
+        FileStoreTable table = createBranchMergeTable();
+
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(0, 0, 0L));
+            commit.commit(0, write.prepareCommit(false, 1));
+        }
+
+        table.createBranch(BRANCH_NAME);
+        FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+        try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+                StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+            write.write(rowData(1, 10, 100L));
+            commit.commit(1, write.prepareCommit(false, 2));
+        }
+
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(2, 20, 200L));
+            commit.commit(2, write.prepareCommit(false, 2));
+        }
+
+        table.mergeBranch(BRANCH_NAME, "main");
+
+        assertThat(
+                        getResult(
+                                table.newRead(),
+                                
toSplits(table.newSnapshotReader().read().dataSplits()),
+                                BATCH_ROW_TO_STRING))
+                .containsExactlyInAnyOrder(
+                        "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+                        "1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+                        "2|20|200|binary|varbinary|mapKey:mapVal|multiset");
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index 9401cce832..80cfb975bd 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -144,6 +144,7 @@ import static 
org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST;
 import static org.apache.paimon.predicate.SortValue.SortDirection.ASCENDING;
 import static org.apache.paimon.predicate.SortValue.SortDirection.DESCENDING;
 import static org.apache.paimon.table.SpecialFields.KEY_FIELD_PREFIX;
+import static 
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -2672,4 +2673,21 @@ public class PrimaryKeySimpleTableTest extends 
SimpleTableTestBase {
                                 ""));
         return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath), 
tablePath, tableSchema);
     }
+
+    @Test
+    public void testMergeBranchPrimaryKeyTable() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(0, 0, 0L));
+            commit.commit(0, write.prepareCommit(false, 1));
+        }
+
+        table.createTag("tag1", 1);
+        table.createBranch(BRANCH_NAME, "tag1");
+
+        assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main"))
+                .satisfies(anyCauseMatches(IllegalArgumentException.class, 
"append-only tables"));
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/FileSystemBranchManagerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/utils/FileSystemBranchManagerTest.java
index c74b53d3f3..109303a928 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/utils/FileSystemBranchManagerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/utils/FileSystemBranchManagerTest.java
@@ -69,7 +69,7 @@ class FileSystemBranchManagerTest {
         tagManager = new TagManager(fileIO, tablePath);
         branchManager =
                 new FileSystemBranchManager(
-                        fileIO, tablePath, snapshotManager, tagManager, 
schemaManager);
+                        fileIO, tablePath, snapshotManager, tagManager, 
schemaManager, null);
     }
 
     @Test

Reply via email to