This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f336ba868a [core][rest] Add branch merge support for append-only
tables (#7882)
f336ba868a is described below
commit f336ba868a2170f6dc05385ada2a815cf1f0764b
Author: Junrui Lee <[email protected]>
AuthorDate: Thu May 21 21:37:03 2026 +0800
[core][rest] Add branch merge support for append-only tables (#7882)
This PR is the first part of #7863, focusing on the core APIs,
implementation, and REST support for branch merge.
It adds `mergeBranch`, which incrementally adds data files that exist
only in the source branch to the target branch, without replacing either
branch. Correctness is guarded by the immutable table option
`'branch-merge.enabled' = 'true'`, which enforces the pure-append
invariant required by file-level branch merge: compaction and `INSERT
OVERWRITE` are rejected, and deletion vectors are not supported.
---
docs/static/rest-catalog-open-api.yaml | 2 +-
.../java/org/apache/paimon/io/DataFileMeta.java | 2 +
.../org/apache/paimon/io/PojoDataFileMeta.java | 25 +
.../paimon/privilege/PrivilegedFileStoreTable.java | 6 +
.../paimon/table/AbstractFileStoreTable.java | 13 +-
.../paimon/table/DelegatedFileStoreTable.java | 5 +
.../java/org/apache/paimon/table/FormatTable.java | 5 +
.../org/apache/paimon/table/ReadonlyTable.java | 8 +
.../main/java/org/apache/paimon/table/Table.java | 4 +
.../org/apache/paimon/utils/BranchManager.java | 17 +
.../apache/paimon/utils/BranchMergeHandler.java | 144 +++
.../apache/paimon/utils/CatalogBranchManager.java | 5 +
.../paimon/utils/FileSystemBranchManager.java | 203 +++-
.../paimon/table/AppendOnlySimpleTableTest.java | 1104 ++++++++++++++++++++
.../paimon/table/PrimaryKeySimpleTableTest.java | 18 +
.../paimon/utils/FileSystemBranchManagerTest.java | 2 +-
16 files changed, 1559 insertions(+), 4 deletions(-)
diff --git a/docs/static/rest-catalog-open-api.yaml
b/docs/static/rest-catalog-open-api.yaml
index ba4c8bed4b..e310f398f8 100644
--- a/docs/static/rest-catalog-open-api.yaml
+++ b/docs/static/rest-catalog-open-api.yaml
@@ -3615,4 +3615,4 @@ components:
securitySchemes:
BearerAuth:
type: http
- scheme: bearer
\ No newline at end of file
+ scheme: bearer
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index a7a4de0b5c..23b34947bc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -329,6 +329,8 @@ public interface DataFileMeta {
DataFileMeta assignFirstRowId(long firstRowId);
+ DataFileMeta newFirstRowId(@Nullable Long newFirstRowId);
+
default List<Path> collectFiles(DataFilePathFactory pathFactory) {
List<Path> paths = new ArrayList<>();
paths.add(pathFactory.toPath(this));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
index 5003974e8e..9e845b26fe 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
@@ -379,6 +379,31 @@ public class PojoDataFileMeta implements DataFileMeta {
writeCols);
}
+ @Override
+ public PojoDataFileMeta newFirstRowId(@Nullable Long newFirstRowId) {
+ return new PojoDataFileMeta(
+ fileName,
+ fileSize,
+ rowCount,
+ minKey,
+ maxKey,
+ keyStats,
+ valueStats,
+ minSequenceNumber,
+ maxSequenceNumber,
+ schemaId,
+ level,
+ extraFiles,
+ creationTime,
+ deleteRowCount,
+ embeddedIndex,
+ fileSource,
+ valueStatsCols,
+ externalPath,
+ newFirstRowId,
+ writeCols);
+ }
+
@Override
public PojoDataFileMeta copy(List<String> newExtraFiles) {
return new PojoDataFileMeta(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
index a2c3e62c36..35dfd308df 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
@@ -204,6 +204,12 @@ public class PrivilegedFileStoreTable extends
DelegatedFileStoreTable {
wrapped.fastForward(branchName);
}
+ @Override
+ public void mergeBranch(String sourceBranch, String targetBranch) {
+ privilegeChecker.assertCanInsert(identifier);
+ wrapped.mergeBranch(sourceBranch, targetBranch);
+ }
+
@Override
public ExpireSnapshots newExpireSnapshots() {
privilegeChecker.assertCanInsert(identifier);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 522d25c7d3..df42249159 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -55,6 +55,7 @@ import
org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.BranchMergeHandler;
import org.apache.paimon.utils.CatalogBranchManager;
import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.DVMetaCache;
@@ -737,6 +738,11 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
branchManager().fastForward(branchName);
}
+ @Override
+ public void mergeBranch(String sourceBranch, String targetBranch) {
+ branchManager().mergeBranch(sourceBranch, targetBranch);
+ }
+
@Override
public TagManager tagManager() {
return new TagManager(fileIO, path, currentBranch(), coreOptions());
@@ -749,7 +755,12 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
return new
CatalogBranchManager(catalogEnvironment.catalogLoader(), identifier());
}
return new FileSystemBranchManager(
- fileIO, path, snapshotManager(), tagManager(),
schemaManager());
+ fileIO,
+ path,
+ snapshotManager(),
+ tagManager(),
+ schemaManager(),
+ new BranchMergeHandler(this::switchToBranch));
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
index 1f234d81ff..7ba4bc20a9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
@@ -295,6 +295,11 @@ public abstract class DelegatedFileStoreTable implements
FileStoreTable {
wrapped.fastForward(branchName);
}
+ @Override
+ public void mergeBranch(String sourceBranch, String targetBranch) {
+ wrapped.mergeBranch(sourceBranch, targetBranch);
+ }
+
@Override
public ExpireSnapshots newExpireSnapshots() {
return wrapped.newExpireSnapshots();
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
index 7f13faf753..aee60f65ee 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
@@ -427,6 +427,11 @@ public interface FormatTable extends Table {
throw new UnsupportedOperationException();
}
+ @Override
+ default void mergeBranch(String sourceBranch, String targetBranch) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
default ExpireSnapshots newExpireSnapshots() {
throw new UnsupportedOperationException();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
index b685f86e3e..8d7add5bc5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
@@ -281,6 +281,14 @@ public interface ReadonlyTable extends InnerTable {
this.getClass().getSimpleName()));
}
+ @Override
+ default void mergeBranch(String sourceBranch, String targetBranch) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Readonly Table %s does not support mergeBranch.",
+ this.getClass().getSimpleName()));
+ }
+
@Override
default ExpireSnapshots newExpireSnapshots() {
throw new UnsupportedOperationException(
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
index 98c00c0101..f58259b1fc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
@@ -210,6 +210,10 @@ public interface Table extends Serializable {
@Experimental
void fastForward(String branchName);
+ /** Merge source branch into target branch (append-only tables only). */
+ @Experimental
+ void mergeBranch(String sourceBranch, String targetBranch);
+
/** Manually expire snapshots, parameters can be controlled independently
of table options. */
@Experimental
ExpireSnapshots newExpireSnapshots();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
index c830a799bf..085f04747c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
@@ -59,6 +59,8 @@ public interface BranchManager {
void fastForward(String branchName);
+ void mergeBranch(String sourceBranch, String targetBranch);
+
void renameBranch(String fromBranch, String toBranch);
List<String> branches();
@@ -98,6 +100,21 @@ public interface BranchManager {
branchName);
}
+ static void mergeValidate(String sourceBranch, String targetBranch) {
+ checkArgument(
+ !StringUtils.isNullOrWhitespaceOnly(sourceBranch),
+ "Source branch name '%s' is blank.",
+ sourceBranch);
+ checkArgument(
+ !StringUtils.isNullOrWhitespaceOnly(targetBranch),
+ "Target branch name '%s' is blank.",
+ targetBranch);
+ checkArgument(
+ !sourceBranch.equals(targetBranch),
+ "Cannot merge branch '%s' into itself.",
+ sourceBranch);
+ }
+
static void fastForwardValidate(String branchName, String currentBranch) {
checkArgument(
!branchName.equals(DEFAULT_MAIN_BRANCH),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/BranchMergeHandler.java
b/paimon-core/src/main/java/org/apache/paimon/utils/BranchMergeHandler.java
new file mode 100644
index 0000000000..0327fd0bcf
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchMergeHandler.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.manifest.FileEntry;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.operation.FileStoreCommit;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.function.Function;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Branch merge handler backed by {@link FileStoreTable}. */
+public class BranchMergeHandler {
+
+ private final Function<String, FileStoreTable> branchTableFactory;
+
+ public BranchMergeHandler(Function<String, FileStoreTable>
branchTableFactory) {
+ this.branchTableFactory = branchTableFactory;
+ }
+
+ public Map<FileEntry.Identifier, ManifestEntry> readBranchFiles(String
branch) {
+ FileStoreTable branchTable = branchTableFactory.apply(branch);
+ Snapshot snapshot = branchTable.snapshotManager().latestSnapshot();
+ checkArgument(
+ snapshot != null,
+ "Cannot read branch '%s', because it does not have any
snapshot.",
+ branch);
+ ManifestList manifestList =
branchTable.store().manifestListFactory().create();
+ ManifestFile manifestFile =
branchTable.store().manifestFileFactory().create();
+ Map<FileEntry.Identifier, ManifestEntry> files = new LinkedHashMap<>();
+ FileEntry.mergeEntries(manifestFile,
manifestList.readDataManifests(snapshot), files, null);
+ return files;
+ }
+
+ public void commit(String targetBranch, List<ManifestEntry> filesToMerge) {
+ FileStoreTable branchTable = branchTableFactory.apply(targetBranch);
+ boolean rowTrackingEnabled =
+ new
CoreOptions(branchTable.schema().options()).rowTrackingEnabled();
+
+ Map<MergeKey, List<DataFileMeta>> grouped = new LinkedHashMap<>();
+ for (ManifestEntry entry : filesToMerge) {
+ DataFileMeta file = prepareFileForTargetCommit(entry.file(),
rowTrackingEnabled);
+ grouped.computeIfAbsent(
+ new MergeKey(
+ entry.partition().copy(), entry.bucket(),
entry.totalBuckets()),
+ k -> new ArrayList<>())
+ .add(file);
+ }
+
+ String commitUser = UUID.randomUUID().toString();
+ ManifestCommittable committable = new ManifestCommittable(0);
+ for (Map.Entry<MergeKey, List<DataFileMeta>> e : grouped.entrySet()) {
+ MergeKey key = e.getKey();
+ CommitMessageImpl message =
+ new CommitMessageImpl(
+ key.partition,
+ key.bucket,
+ key.totalBuckets,
+ new DataIncrement(
+ e.getValue(), Collections.emptyList(),
Collections.emptyList()),
+ CompactIncrement.emptyIncrement());
+ committable.addFileCommittable(message);
+ }
+
+ try (FileStoreCommit commit =
branchTable.store().newCommit(commitUser, branchTable)) {
+ commit.appendCommitCheckConflict(true).commit(committable, true);
+ }
+ }
+
+ private DataFileMeta prepareFileForTargetCommit(DataFileMeta file, boolean
rowTrackingEnabled) {
+ if (rowTrackingEnabled && file.firstRowId() != null) {
+ // Source files already have row ids assigned in their branch.
Clear them so the
+ // target branch commit path assigns fresh, non-overlapping row
ids.
+ return file.newFirstRowId(null);
+ }
+ return file;
+ }
+
+ private static class MergeKey {
+ final BinaryRow partition;
+ final int bucket;
+ final int totalBuckets;
+
+ MergeKey(BinaryRow partition, int bucket, int totalBuckets) {
+ this.partition = partition;
+ this.bucket = bucket;
+ this.totalBuckets = totalBuckets;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof MergeKey)) {
+ return false;
+ }
+ MergeKey that = (MergeKey) o;
+ return bucket == that.bucket
+ && totalBuckets == that.totalBuckets
+ && Objects.equals(partition, that.partition);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(partition, bucket, totalBuckets);
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java
index 4ba9c39475..9f97cf3a09 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java
@@ -108,6 +108,11 @@ public class CatalogBranchManager implements BranchManager
{
});
}
+ @Override
+ public void mergeBranch(String sourceBranch, String targetBranch) {
+ throw new UnsupportedOperationException("Branch merge is not supported
via catalog.");
+ }
+
@Override
public void renameBranch(String fromBranch, String toBranch) {
executePost(catalog -> catalog.renameBranch(identifier, fromBranch,
toBranch));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java
index 9c09637d1c..aee1787850 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java
@@ -18,9 +18,13 @@
package org.apache.paimon.utils;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.FileEntry;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.tag.Tag;
@@ -29,8 +33,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -47,18 +54,21 @@ public class FileSystemBranchManager implements
BranchManager {
private final SnapshotManager snapshotManager;
private final TagManager tagManager;
private final SchemaManager schemaManager;
+ private final BranchMergeHandler mergeHandler;
public FileSystemBranchManager(
FileIO fileIO,
Path path,
SnapshotManager snapshotManager,
TagManager tagManager,
- SchemaManager schemaManager) {
+ SchemaManager schemaManager,
+ BranchMergeHandler mergeHandler) {
this.fileIO = fileIO;
this.tablePath = path;
this.snapshotManager = snapshotManager;
this.tagManager = tagManager;
this.schemaManager = schemaManager;
+ this.mergeHandler = mergeHandler;
}
/** Return the root Directory of branch. */
@@ -210,6 +220,197 @@ public class FileSystemBranchManager implements
BranchManager {
}
}
+ @Override
+ public void mergeBranch(String sourceBranch, String targetBranch) {
+ BranchManager.mergeValidate(sourceBranch, targetBranch);
+ validateMergeBranches(sourceBranch, targetBranch);
+ validateAppendOnlyHistory(sourceBranch, targetBranch);
+ validateAppendOnly(sourceBranch, targetBranch);
+ validateNoDataEvolution(sourceBranch, targetBranch);
+ validateRowTrackingConsistent(sourceBranch, targetBranch);
+ validateLatestSchema(sourceBranch, targetBranch);
+
+ List<ManifestEntry> filesToMerge = computeMergeDiff(sourceBranch,
targetBranch);
+ if (filesToMerge.isEmpty()) {
+ return;
+ }
+
+ validateMergeFileSchemas(sourceBranch, targetBranch, filesToMerge);
+ mergeHandler.commit(targetBranch, filesToMerge);
+ }
+
+ private void validateMergeBranches(String sourceBranch, String
targetBranch) {
+ if (!BranchManager.isMainBranch(sourceBranch)) {
+ checkArgument(branchExists(sourceBranch), "Branch '%s' doesn't
exist.", sourceBranch);
+ }
+ if (!BranchManager.isMainBranch(targetBranch)) {
+ checkArgument(branchExists(targetBranch), "Branch '%s' doesn't
exist.", targetBranch);
+ }
+
+ SnapshotManager sourceSm =
snapshotManager.copyWithBranch(sourceBranch);
+ checkArgument(
+ sourceSm.latestSnapshotId() != null,
+ "Cannot merge branch '%s', because it does not have snapshot.",
+ sourceBranch);
+
+ SnapshotManager targetSm =
snapshotManager.copyWithBranch(targetBranch);
+ checkArgument(
+ targetSm.latestSnapshotId() != null,
+ "Cannot merge into branch '%s', because it does not have
snapshot.",
+ targetBranch);
+ }
+
+ private void validateLatestSchema(String sourceBranch, String
targetBranch) {
+ SchemaManager sourceSchemaMgr = new SchemaManager(fileIO, tablePath,
sourceBranch);
+ SchemaManager targetSchemaMgr = new SchemaManager(fileIO, tablePath,
targetBranch);
+ TableSchema sourceSchema = sourceSchemaMgr.latest().get();
+ TableSchema targetSchema = targetSchemaMgr.latest().get();
+ checkArgument(
+ sourceSchema.fields().equals(targetSchema.fields()),
+ "Cannot merge branch '%s' into '%s', schema mismatch.",
+ sourceBranch,
+ targetBranch);
+ }
+
+ private void validateMergeFileSchemas(
+ String sourceBranch, String targetBranch, List<ManifestEntry>
filesToMerge) {
+ SchemaManager sourceSchemaMgr = new SchemaManager(fileIO, tablePath,
sourceBranch);
+ SchemaManager targetSchemaMgr = new SchemaManager(fileIO, tablePath,
targetBranch);
+
+ for (Long schemaId :
+ filesToMerge.stream()
+ .map(entry -> entry.file().schemaId())
+ .collect(Collectors.toSet())) {
+ checkArgument(
+ targetSchemaMgr.schemaExists(schemaId),
+ "Cannot merge branch '%s' into '%s', because target branch
does not contain "
+ + "schema id %s required by source files.",
+ sourceBranch,
+ targetBranch,
+ schemaId);
+
+ TableSchema sourceSchema = sourceSchemaMgr.schema(schemaId);
+ TableSchema targetSchema = targetSchemaMgr.schema(schemaId);
+ checkArgument(
+ sourceSchema.equals(targetSchema),
+ "Cannot merge branch '%s' into '%s', schema history
mismatch for schema id %s.",
+ sourceBranch,
+ targetBranch,
+ schemaId);
+ }
+ }
+
+ private void validateAppendOnly(String sourceBranch, String targetBranch) {
+ SchemaManager sourceSchemaMgr = new SchemaManager(fileIO, tablePath,
sourceBranch);
+ TableSchema sourceSchema = sourceSchemaMgr.latest().get();
+ checkArgument(
+ sourceSchema.primaryKeys().isEmpty(),
+ "Branch merge is only supported for append-only tables, "
+ + "but branch '%s' has primary keys.",
+ sourceBranch);
+
+ SchemaManager targetSchemaMgr = new SchemaManager(fileIO, tablePath,
targetBranch);
+ TableSchema targetSchema = targetSchemaMgr.latest().get();
+ checkArgument(
+ targetSchema.primaryKeys().isEmpty(),
+ "Branch merge is only supported for append-only tables, "
+ + "but branch '%s' has primary keys.",
+ targetBranch);
+ }
+
+ // Branch merge is implemented as a conservative file-level merge. Without
persisted branch
+ // lineage metadata, we cannot reliably infer a fork point after snapshots
expire. To preserve
+ // correctness, both branches must retain complete append-only history
from the first snapshot.
+ // This restriction can be relaxed in the future if branch fork metadata
is introduced.
+ private void validateAppendOnlyHistory(String sourceBranch, String
targetBranch) {
+
validateCompleteAppendOnly(snapshotManager.copyWithBranch(sourceBranch),
sourceBranch);
+
validateCompleteAppendOnly(snapshotManager.copyWithBranch(targetBranch),
targetBranch);
+ }
+
+ private void validateCompleteAppendOnly(SnapshotManager sm, String branch)
{
+ Long earliest = sm.earliestSnapshotId();
+ Long latest = sm.latestSnapshotId();
+ if (earliest == null || latest == null) {
+ return;
+ }
+ if (earliest != Snapshot.FIRST_SNAPSHOT_ID) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot merge: branch '%s' does not start at
snapshot %d "
+ + "(earliest is %d). "
+ + "Branch merge requires complete
append-only snapshot history.",
+ branch, Snapshot.FIRST_SNAPSHOT_ID, earliest));
+ }
+ for (long id = Snapshot.FIRST_SNAPSHOT_ID; id <= latest; id++) {
+ if (!sm.snapshotExists(id)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot merge: snapshot %d is missing on
branch '%s'. "
+ + "Branch merge requires complete
append-only snapshot history.",
+ id, branch));
+ }
+ Snapshot snapshot = sm.snapshot(id);
+ Snapshot.CommitKind kind = snapshot.commitKind();
+ if (kind != Snapshot.CommitKind.APPEND && kind !=
Snapshot.CommitKind.ANALYZE) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot merge: snapshot %d on branch '%s' has
commit kind '%s'. "
+ + "Branch merge requires complete
append-only snapshot history.",
+ id, branch, kind));
+ }
+ }
+ }
+
+ private void validateNoDataEvolution(String sourceBranch, String
targetBranch) {
+ for (String branch : new String[] {sourceBranch, targetBranch}) {
+ SchemaManager sm = new SchemaManager(fileIO, tablePath, branch);
+ TableSchema schema = sm.latest().get();
+ CoreOptions opts = new CoreOptions(schema.options());
+ checkArgument(
+ !opts.dataEvolutionEnabled(),
+ "Branch merge is not supported for data-evolution tables
(branch '%s').",
+ branch);
+ }
+ }
+
+ private void validateRowTrackingConsistent(String sourceBranch, String
targetBranch) {
+ boolean sourceEnabled = isRowTrackingEnabled(sourceBranch);
+ boolean targetEnabled = isRowTrackingEnabled(targetBranch);
+ checkArgument(
+ sourceEnabled == targetEnabled,
+ "Cannot merge branch '%s' into '%s': row-tracking settings
must match "
+ + "(source=%s, target=%s).",
+ sourceBranch,
+ targetBranch,
+ sourceEnabled,
+ targetEnabled);
+ }
+
+ private boolean isRowTrackingEnabled(String branch) {
+ SchemaManager schemaManager = new SchemaManager(fileIO, tablePath,
branch);
+ TableSchema schema = schemaManager.latest().get();
+ return new CoreOptions(schema.options()).rowTrackingEnabled();
+ }
+
+ private List<ManifestEntry> computeMergeDiff(String sourceBranch, String
targetBranch) {
+ Set<FileEntry.Identifier> targetFileIds =
+ mergeHandler.readBranchFiles(targetBranch).keySet();
+
+ Map<FileEntry.Identifier, ManifestEntry> sourceFiles =
+ mergeHandler.readBranchFiles(sourceBranch);
+
+ List<ManifestEntry> filesToMerge = new ArrayList<>();
+ for (Map.Entry<FileEntry.Identifier, ManifestEntry> entry :
sourceFiles.entrySet()) {
+ if (!targetFileIds.contains(entry.getKey())) {
+ ManifestEntry manifestEntry = entry.getValue();
+ if (manifestEntry.kind() == FileKind.ADD) {
+ filesToMerge.add(manifestEntry);
+ }
+ }
+ }
+ return filesToMerge;
+ }
+
/** Check if a branch exists. */
public boolean branchExists(String branchName) {
Path branchPath = branchPath(branchName);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
index 30ccc652a0..c91dbb4c8e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
@@ -19,6 +19,7 @@
package org.apache.paimon.table;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.append.AppendCompactTask;
import org.apache.paimon.bucket.DefaultBucketFunction;
import org.apache.paimon.data.BinaryRow;
@@ -37,6 +38,8 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.BundleRecords;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileEntry;
+import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
@@ -73,6 +76,7 @@ import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.BranchMergeHandler;
import org.apache.paimon.utils.RoaringBitmap32;
import org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetOutputFormat;
@@ -86,8 +90,10 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -117,6 +123,7 @@ import static org.apache.paimon.CoreOptions.WRITE_ONLY;
import static org.apache.paimon.io.DataFileTestUtils.row;
import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST;
import static org.apache.paimon.predicate.SortValue.SortDirection.DESCENDING;
+import static
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -1699,4 +1706,1101 @@ public class AppendOnlySimpleTableTest extends
SimpleTableTestBase {
""));
return new AppendOnlyFileStoreTable(FileIOFinder.find(tablePath),
tablePath, tableSchema);
}
+
+ protected FileStoreTable createBranchMergeTable() throws Exception {
+ return createFileStoreTable();
+ }
+
+ protected FileStoreTable createBranchMergeTable(Consumer<Options>
extraConfigure)
+ throws Exception {
+ return createFileStoreTable(extraConfigure);
+ }
+
+ @Test
+ public void testMergeBranch() throws Exception {
+ FileStoreTable table = createBranchMergeTable();
+
+ // Write data to main
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(0, 0, 0L));
+ commit.commit(0, write.prepareCommit(false, 1));
+ }
+
+ // Create branch from tag
+ table.createTag("tag1", 1);
+ table.createBranch(BRANCH_NAME, "tag1");
+ FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+ // Write data to branch
+ try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+ StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+ write.write(rowData(1, 10, 100L));
+ commit.commit(1, write.prepareCommit(false, 2));
+ }
+
+ // Write more data to main
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(2, 20, 200L));
+ commit.commit(2, write.prepareCommit(false, 2));
+ }
+
+ // Merge branch into main
+ table.mergeBranch(BRANCH_NAME, "main");
+
+ // Verify main has data from both sides
+ assertThat(
+ getResult(
+ table.newRead(),
+
toSplits(table.newSnapshotReader().read().dataSplits()),
+ BATCH_ROW_TO_STRING))
+ .containsExactlyInAnyOrder(
+ "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+ "1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+ "2|20|200|binary|varbinary|mapKey:mapVal|multiset");
+ }
+
+ @Test
+ public void testMergeBranchMultipleTimes() throws Exception {
+ FileStoreTable table = createBranchMergeTable();
+
+ // Write data to main
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(0, 0, 0L));
+ commit.commit(0, write.prepareCommit(false, 1));
+ }
+
+ // Create branch from tag
+ table.createTag("tag1", 1);
+ table.createBranch(BRANCH_NAME, "tag1");
+ FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+ // First write to branch
+ try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+ StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+ write.write(rowData(1, 10, 100L));
+ commit.commit(1, write.prepareCommit(false, 2));
+ }
+
+ // First merge
+ table.mergeBranch(BRANCH_NAME, "main");
+
+ // Second write to branch
+ try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+ StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+ write.write(rowData(2, 20, 200L));
+ commit.commit(2, write.prepareCommit(false, 3));
+ }
+
+ // Second merge
+ table.mergeBranch(BRANCH_NAME, "main");
+
+ // Verify no duplicates: main has all 3 rows exactly once
+ assertThat(
+ getResult(
+ table.newRead(),
+
toSplits(table.newSnapshotReader().read().dataSplits()),
+ BATCH_ROW_TO_STRING))
+ .containsExactlyInAnyOrder(
+ "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+ "1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+ "2|20|200|binary|varbinary|mapKey:mapVal|multiset");
+ }
+
+ @Test
+ public void testMergeBranchFailsOnStaleDuplicateCommit() throws Exception {
+ FileStoreTable table = createBranchMergeTable();
+
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(0, 0, 0L));
+ commit.commit(0, write.prepareCommit(false, 1));
+ }
+
+ table.createBranch(BRANCH_NAME);
+ FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+ try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+ StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+ write.write(rowData(1, 10, 100L));
+ commit.commit(1, write.prepareCommit(false, 2));
+ }
+
+ BranchMergeHandler handler = new
BranchMergeHandler(table::switchToBranch);
+ Map<org.apache.paimon.manifest.FileEntry.Identifier, ManifestEntry>
sourceFiles =
+ handler.readBranchFiles(BRANCH_NAME);
+ Map<org.apache.paimon.manifest.FileEntry.Identifier, ManifestEntry>
targetFiles =
+ handler.readBranchFiles("main");
+ List<ManifestEntry> filesToMerge =
+ sourceFiles.entrySet().stream()
+ .filter(entry ->
!targetFiles.containsKey(entry.getKey()))
+ .map(Map.Entry::getValue)
+ .collect(Collectors.toList());
+
+ handler.commit("main", filesToMerge);
+ assertThatThrownBy(() -> handler.commit("main", filesToMerge))
+ .satisfies(anyCauseMatches(RuntimeException.class, "Trying to
add file"));
+ }
+
+ @Test
+ public void testMergeBranchBidirectional() throws Exception {
+ FileStoreTable table = createBranchMergeTable();
+
+ // Write shared data to main
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(0, 0, 0L));
+ commit.commit(0, write.prepareCommit(false, 1));
+ }
+
+ // Create branch from tag
+ table.createTag("tag1", 1);
+ table.createBranch(BRANCH_NAME, "tag1");
+ FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+ // Write to branch
+ try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+ StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+ write.write(rowData(1, 10, 100L));
+ commit.commit(1, write.prepareCommit(false, 2));
+ }
+
+ // Write to main
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(2, 20, 200L));
+ commit.commit(2, write.prepareCommit(false, 2));
+ }
+
+ // Merge branch -> main
+ table.mergeBranch(BRANCH_NAME, "main");
+
+ // Merge main -> branch
+ table.mergeBranch("main", BRANCH_NAME);
+
+ // Verify both have the same data without duplicates
+ List<String> mainData =
+ getResult(
+ table.newRead(),
+
toSplits(table.newSnapshotReader().read().dataSplits()),
+ BATCH_ROW_TO_STRING);
+ List<String> branchData =
+ getResult(
+ tableBranch.newRead(),
+
toSplits(tableBranch.newSnapshotReader().read().dataSplits()),
+ BATCH_ROW_TO_STRING);
+
+ assertThat(mainData)
+ .containsExactlyInAnyOrder(
+ "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+ "1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+ "2|20|200|binary|varbinary|mapKey:mapVal|multiset");
+ assertThat(branchData).containsExactlyInAnyOrderElementsOf(mainData);
+ }
+
+ @Test
+ public void testMergeBranchEmptyDiff() throws Exception {
+ FileStoreTable table = createBranchMergeTable();
+
+ // Write data to main
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(0, 0, 0L));
+ commit.commit(0, write.prepareCommit(false, 1));
+ }
+
+ // Create branch from tag (has same data as main)
+ table.createTag("tag1", 1);
+ table.createBranch(BRANCH_NAME, "tag1");
+
+ // Merge should be a no-op (no exception, no new snapshot)
+ long snapshotBefore = table.snapshotManager().latestSnapshotId();
+ table.mergeBranch(BRANCH_NAME, "main");
+ long snapshotAfter = table.snapshotManager().latestSnapshotId();
+ assertThat(snapshotAfter).isEqualTo(snapshotBefore);
+ }
+
+ @Test
+ public void testMergeBranchSchemaConflict() throws Exception {
+ FileStoreTable table = createBranchMergeTable();
+
+ // Write data to main
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(0, 0, 0L));
+ commit.commit(0, write.prepareCommit(false, 1));
+ }
+
+ // Create branch from tag
+ table.createTag("tag1", 1);
+ table.createBranch(BRANCH_NAME, "tag1");
+
+ // Modify schema on main (add a column)
+ SchemaManager schemaManager = new SchemaManager(table.fileIO(),
table.location());
+ schemaManager.commitChanges(SchemaChange.addColumn("new_col",
DataTypes.INT()));
+
+ // Merge should fail due to schema mismatch
+ assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main"))
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "Cannot merge branch 'branch1' into 'main',
schema mismatch."));
+ }
+
+ @Test
+ public void testMergeBranchSchemaHistoryConflict() throws Exception {
+ FileStoreTable table = createBranchMergeTable();
+
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(0, 0, 0L));
+ commit.commit(0, write.prepareCommit(false, 1));
+ }
+
+ table.createTag("tag1", 1);
+ table.createBranch(BRANCH_NAME, "tag1");
+
+ SchemaManager branchSchemaManager =
+ new SchemaManager(table.fileIO(), table.location(),
BRANCH_NAME);
+ branchSchemaManager.commitChanges(SchemaChange.addColumn("source_col",
DataTypes.INT()));
+ FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+ try (BatchTableWrite write =
+
tableBranch.newBatchWriteBuilder().newWrite().withWriteType(ROW_TYPE);
+ BatchTableCommit commit =
tableBranch.newBatchWriteBuilder().newCommit()) {
+ write.write(rowData(1, 10, 100L));
+ commit.commit(write.prepareCommit());
+ }
+ branchSchemaManager.commitChanges(
+
Collections.singletonList(SchemaChange.dropColumn("source_col")));
+
+ SchemaManager mainSchemaManager = new SchemaManager(table.fileIO(),
table.location());
+ mainSchemaManager.commitChanges(SchemaChange.addColumn("target_col",
DataTypes.INT()));
+ mainSchemaManager.commitChanges(
+
Collections.singletonList(SchemaChange.dropColumn("target_col")));
+
+ assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main"))
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "schema history mismatch for schema id 1"));
+ }
+
+ @Test
+ public void testMergeBranchRowTrackingTable() throws Exception {
+ FileStoreTable table =
+ createBranchMergeTable(
+ options ->
options.set(CoreOptions.ROW_TRACKING_ENABLED, true));
+
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(0, 0, 0L));
+ commit.commit(0, write.prepareCommit(false, 1));
+ }
+
+ table.createTag("tag1", 1);
+ table.createBranch(BRANCH_NAME, "tag1");
+ FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+ try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+ StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+ write.write(rowData(1, 10, 100L));
+ commit.commit(1, write.prepareCommit(false, 2));
+ }
+
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(2, 20, 200L));
+ commit.commit(2, write.prepareCommit(false, 2));
+ }
+
+ table.mergeBranch(BRANCH_NAME, "main");
+
+ assertThat(
+ getResult(
+ table.newRead(),
+
toSplits(table.newSnapshotReader().read().dataSplits()),
+ BATCH_ROW_TO_STRING))
+ .containsExactlyInAnyOrder(
+ "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+ "1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+ "2|20|200|binary|varbinary|mapKey:mapVal|multiset");
+
+ assertRowIdRangesNonOverlapping(table);
+ Snapshot snapshot = table.snapshotManager().latestSnapshot();
+ assertThat(snapshot.nextRowId()).isEqualTo(3L);
+ }
+
+ @Test
+ public void testMergeBranchRowTrackingMultipleTimes() throws Exception {
+ FileStoreTable table =
+ createBranchMergeTable(
+ options ->
options.set(CoreOptions.ROW_TRACKING_ENABLED, true));
+
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(0, 0, 0L));
+ commit.commit(0, write.prepareCommit(false, 1));
+ }
+
+ table.createTag("tag1", 1);
+ table.createBranch(BRANCH_NAME, "tag1");
+ FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+ // First write to branch + merge
+ try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+ StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+ write.write(rowData(1, 10, 100L));
+ commit.commit(1, write.prepareCommit(false, 2));
+ }
+ table.mergeBranch(BRANCH_NAME, "main");
+
+ // Second write to branch + merge
+ try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+ StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+ write.write(rowData(2, 20, 200L));
+ commit.commit(2, write.prepareCommit(false, 3));
+ }
+ table.mergeBranch(BRANCH_NAME, "main");
+
+ assertThat(
+ getResult(
+ table.newRead(),
+
toSplits(table.newSnapshotReader().read().dataSplits()),
+ BATCH_ROW_TO_STRING))
+ .containsExactlyInAnyOrder(
+ "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+ "1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+ "2|20|200|binary|varbinary|mapKey:mapVal|multiset");
+
+ assertRowIdRangesNonOverlapping(table);
+ Snapshot snapshot = table.snapshotManager().latestSnapshot();
+ assertThat(snapshot.nextRowId()).isEqualTo(3L);
+ }
+
+ @Test
+ public void testMergeBranchRowTrackingAfterTargetWrites() throws Exception
{
+ FileStoreTable table =
+ createBranchMergeTable(
+ options ->
options.set(CoreOptions.ROW_TRACKING_ENABLED, true));
+
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(0, 0, 0L));
+ commit.commit(0, write.prepareCommit(false, 1));
+ }
+
+ table.createTag("tag1", 1);
+ table.createBranch(BRANCH_NAME, "tag1");
+ FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+ // Write 2 rows to branch
+ try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+ StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(1, 11, 101L));
+ commit.commit(1, write.prepareCommit(false, 2));
+ }
+
+ // Write 3 rows to main independently (advances main nextRowId)
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(2, 20, 200L));
+ write.write(rowData(2, 21, 201L));
+ write.write(rowData(2, 22, 202L));
+ commit.commit(2, write.prepareCommit(false, 2));
+ }
+
+ table.mergeBranch(BRANCH_NAME, "main");
+
+ assertThat(
+ getResult(
+ table.newRead(),
+
toSplits(table.newSnapshotReader().read().dataSplits()),
+ BATCH_ROW_TO_STRING))
+ .hasSize(6);
+
+ assertRowIdRangesNonOverlapping(table);
+ Snapshot snapshot = table.snapshotManager().latestSnapshot();
+ assertThat(snapshot.nextRowId()).isEqualTo(6L);
+ }
+
+ @Test
+ public void testMergeBranchRowTrackingBetweenNonMainBranches() throws
Exception {
+ FileStoreTable table =
+ createBranchMergeTable(
+ options ->
options.set(CoreOptions.ROW_TRACKING_ENABLED, true));
+
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(0, 0, 0L));
+ commit.commit(0, write.prepareCommit(false, 1));
+ }
+
+ table.createTag("tag1", 1);
+ table.createBranch("branchA", "tag1");
+ table.createBranch("branchB", "tag1");
+ FileStoreTable tableA = table.switchToBranch("branchA");
+ FileStoreTable tableB = table.switchToBranch("branchB");
+
+ try (StreamTableWrite write = tableA.newWrite(commitUser);
+ StreamTableCommit commit = tableA.newCommit(commitUser)) {
+ write.write(rowData(1, 10, 100L));
+ commit.commit(1, write.prepareCommit(false, 2));
+ }
+
+ try (StreamTableWrite write = tableB.newWrite(commitUser);
+ StreamTableCommit commit = tableB.newCommit(commitUser)) {
+ write.write(rowData(2, 20, 200L));
+ commit.commit(2, write.prepareCommit(false, 2));
+ }
+
+ table.mergeBranch("branchA", "branchB");
+
+ tableB = table.switchToBranch("branchB");
+ assertThat(
+ getResult(
+ tableB.newRead(),
+
toSplits(tableB.newSnapshotReader().read().dataSplits()),
+ BATCH_ROW_TO_STRING))
+ .containsExactlyInAnyOrder(
+ "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+ "1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+ "2|20|200|binary|varbinary|mapKey:mapVal|multiset");
+
+ assertRowIdRangesNonOverlapping(tableB);
+ Snapshot snapshot = tableB.snapshotManager().latestSnapshot();
+ assertThat(snapshot.nextRowId()).isEqualTo(3L);
+ }
+
+ @Test
+ public void testMergeBranchRowTrackingMismatch() throws Exception {
+ FileStoreTable table =
+ createBranchMergeTable(
+ options ->
options.set(CoreOptions.ROW_TRACKING_ENABLED, true));
+
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(0, 0, 0L));
+ commit.commit(0, write.prepareCommit(false, 1));
+ }
+
+ table.createTag("tag1", 1);
+ table.createBranch(BRANCH_NAME, "tag1");
+ FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+ try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+ StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+ write.write(rowData(1, 10, 100L));
+ commit.commit(1, write.prepareCommit(false, 2));
+ }
+
+ // Directly write a new schema to the branch with row-tracking disabled
+ SchemaManager branchSchemaManager =
+ new SchemaManager(table.fileIO(), table.location(),
BRANCH_NAME);
+ TableSchema branchSchema = branchSchemaManager.latest().get();
+ Map<String, String> newOptions = new HashMap<>(branchSchema.options());
+ newOptions.remove("row-tracking.enabled");
+ TableSchema mismatchedSchema =
+ new TableSchema(
+ branchSchema.version(),
+ branchSchema.id() + 1,
+ branchSchema.fields(),
+ branchSchema.highestFieldId(),
+ branchSchema.partitionKeys(),
+ branchSchema.primaryKeys(),
+ newOptions,
+ branchSchema.comment(),
+ branchSchema.timeMillis());
+ branchSchemaManager.commit(mismatchedSchema);
+
+ assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main"))
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "row-tracking settings must match"));
+ }
+
+ @Test
+ public void testMergeBranchRowTrackingStaleMerge() throws Exception {
+ FileStoreTable table =
+ createBranchMergeTable(
+ options ->
options.set(CoreOptions.ROW_TRACKING_ENABLED, true));
+
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(0, 0, 0L));
+ commit.commit(0, write.prepareCommit(false, 1));
+ }
+
+ table.createTag("tag1", 1);
+ table.createBranch(BRANCH_NAME, "tag1");
+ FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+ // Write to branch
+ try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+ StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+ write.write(rowData(1, 10, 100L));
+ commit.commit(1, write.prepareCommit(false, 2));
+ }
+
+ // Write to main multiple times to advance nextRowId
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(2, 20, 200L));
+ commit.commit(2, write.prepareCommit(false, 2));
+ }
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(3, 30, 300L));
+ write.write(rowData(3, 31, 301L));
+ commit.commit(3, write.prepareCommit(false, 3));
+ }
+
+ // Merge: branch file should get firstRowId after all main files
+ table.mergeBranch(BRANCH_NAME, "main");
+
+ assertThat(
+ getResult(
+ table.newRead(),
+
toSplits(table.newSnapshotReader().read().dataSplits()),
+ BATCH_ROW_TO_STRING))
+ .hasSize(5);
+
+ assertRowIdRangesNonOverlapping(table);
+ Snapshot snapshot = table.snapshotManager().latestSnapshot();
+ assertThat(snapshot.nextRowId()).isEqualTo(5L);
+
+ // Write more to main, then merge again (branch has no new data,
should be no-op)
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(4, 40, 400L));
+ commit.commit(4, write.prepareCommit(false, 4));
+ }
+
+ long snapshotIdBefore = table.snapshotManager().latestSnapshotId();
+ table.mergeBranch(BRANCH_NAME, "main");
+ long snapshotIdAfter = table.snapshotManager().latestSnapshotId();
+
+ // Second merge should be no-op (branch file already in target)
+ assertThat(snapshotIdAfter).isEqualTo(snapshotIdBefore);
+
+ assertRowIdRangesNonOverlapping(table);
+ Snapshot finalSnapshot = table.snapshotManager().latestSnapshot();
+ assertThat(finalSnapshot.nextRowId()).isEqualTo(6L);
+ }
+
+ private void assertRowIdRangesNonOverlapping(FileStoreTable table) {
+ ManifestList manifestList =
table.store().manifestListFactory().create();
+ ManifestFile manifestFile =
table.store().manifestFileFactory().create();
+ Snapshot snapshot = table.snapshotManager().latestSnapshot();
+ Map<FileEntry.Identifier, ManifestEntry> files = new LinkedHashMap<>();
+ FileEntry.mergeEntries(manifestFile,
manifestList.readDataManifests(snapshot), files, null);
+
+ List<long[]> ranges = new ArrayList<>();
+ for (ManifestEntry entry : files.values()) {
+ if (entry.file().firstRowId() != null) {
+ long start = entry.file().firstRowId();
+ long end = start + entry.file().rowCount() - 1;
+ ranges.add(new long[] {start, end});
+ }
+ }
+ ranges.sort(Comparator.comparingLong(r -> r[0]));
+ for (int i = 1; i < ranges.size(); i++) {
+ assertTrue(
+ ranges.get(i)[0] > ranges.get(i - 1)[1],
+ String.format(
+ "Row-id ranges overlap: [%d, %d] and [%d, %d]",
+ ranges.get(i - 1)[0],
+ ranges.get(i - 1)[1],
+ ranges.get(i)[0],
+ ranges.get(i)[1]));
+ }
+ }
+
+ @Test
+ public void testMergeBranchSameBranch() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+
+ assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, BRANCH_NAME))
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "Cannot merge branch 'branch1' into itself."));
+ }
+
+ @Test
+ public void testMergeBranchSamePartition() throws Exception {
+ FileStoreTable table = createBranchMergeTable();
+
+ // Write data to main (partition pt=0)
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(0, 0, 0L));
+ commit.commit(0, write.prepareCommit(false, 1));
+ }
+
+ // Create branch from tag
+ table.createTag("tag1", 1);
+ table.createBranch(BRANCH_NAME, "tag1");
+ FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+ // Write to branch with same partition pt=0
+ try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+ StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+ write.write(rowData(0, 10, 100L));
+ commit.commit(1, write.prepareCommit(false, 2));
+ }
+
+ // Merge branch into main
+ table.mergeBranch(BRANCH_NAME, "main");
+
+ // Both files coexist in the same partition
+ assertThat(
+ getResult(
+ table.newRead(),
+
toSplits(table.newSnapshotReader().read().dataSplits()),
+ BATCH_ROW_TO_STRING))
+ .containsExactlyInAnyOrder(
+ "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+ "0|10|100|binary|varbinary|mapKey:mapVal|multiset");
+ }
+
+ @Test
+ public void testMergeBranchNonExistentBranch() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(0, 0, 0L));
+ commit.commit(0, write.prepareCommit(false, 1));
+ }
+
+ assertThatThrownBy(() -> table.mergeBranch("nonexistent", "main"))
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "Branch 'nonexistent' doesn't exist."));
+ }
+
+ @Test
+ public void testMergeBranchMultiBucket() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(CoreOptions.BUCKET, 2);
+ });
+
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(0, 0, 0L));
+ write.write(rowData(0, 1, 1L));
+ commit.commit(0, write.prepareCommit(false, 1));
+ }
+
+ table.createTag("tag1", 1);
+ table.createBranch(BRANCH_NAME, "tag1");
+ FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+ try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+ StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(1, 11, 110L));
+ commit.commit(1, write.prepareCommit(false, 2));
+ }
+
+ table.mergeBranch(BRANCH_NAME, "main");
+
+ assertThat(
+ getResult(
+ table.newRead(),
+
toSplits(table.newSnapshotReader().read().dataSplits()),
+ BATCH_ROW_TO_STRING))
+ .containsExactlyInAnyOrder(
+ "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+ "0|1|1|binary|varbinary|mapKey:mapVal|multiset",
+ "1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+ "1|11|110|binary|varbinary|mapKey:mapVal|multiset");
+ }
+
+ @Test
+ public void testMergeBranchNonExistentTargetBranch() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(0, 0, 0L));
+ commit.commit(0, write.prepareCommit(false, 1));
+ }
+
+ table.createBranch(BRANCH_NAME);
+
+ assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "nonexistent"))
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "Branch 'nonexistent' doesn't exist."));
+ }
+
+ @Test
+ public void testMergeBranchBetweenNonMainBranches() throws Exception {
+ FileStoreTable table = createBranchMergeTable();
+
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(0, 0, 0L));
+ commit.commit(0, write.prepareCommit(false, 1));
+ }
+
+ // Create two branches from tag so they share the same base data
+ table.createTag("tag1", 1);
+ table.createBranch("branchA", "tag1");
+ table.createBranch("branchB", "tag1");
+ FileStoreTable tableA = table.switchToBranch("branchA");
+ FileStoreTable tableB = table.switchToBranch("branchB");
+
+ // Write to branchA
+ try (StreamTableWrite write = tableA.newWrite(commitUser);
+ StreamTableCommit commit = tableA.newCommit(commitUser)) {
+ write.write(rowData(1, 10, 100L));
+ commit.commit(1, write.prepareCommit(false, 2));
+ }
+
+ // Write to branchB
+ try (StreamTableWrite write = tableB.newWrite(commitUser);
+ StreamTableCommit commit = tableB.newCommit(commitUser)) {
+ write.write(rowData(2, 20, 200L));
+ commit.commit(2, write.prepareCommit(false, 2));
+ }
+
+ // Merge branchA into branchB
+ table.mergeBranch("branchA", "branchB");
+
+ // Reload branchB table to see changes
+ tableB = table.switchToBranch("branchB");
+ assertThat(
+ getResult(
+ tableB.newRead(),
+
toSplits(tableB.newSnapshotReader().read().dataSplits()),
+ BATCH_ROW_TO_STRING))
+ .containsExactlyInAnyOrder(
+ "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+ "1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+ "2|20|200|binary|varbinary|mapKey:mapVal|multiset");
+ }
+
+ @Test
+ public void testMergeBranchAfterSnapshotExpiration() throws Exception {
+ FileStoreTable table =
+ createBranchMergeTable(
+ conf -> {
+ conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 1);
+ conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 1);
+ });
+
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(0, 0, 0L));
+ commit.commit(0, write.prepareCommit(false, 1));
+ }
+
+ table.createTag("tag1", 1);
+ table.createBranch(BRANCH_NAME, "tag1");
+ FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+ try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+ StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+ write.write(rowData(1, 10, 100L));
+ commit.commit(1, write.prepareCommit(false, 2));
+ }
+
+ for (int i = 2; i < 5; i++) {
+ try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+ StreamTableCommit commit =
tableBranch.newCommit(commitUser)) {
+ write.write(rowData(i, i * 10, (long) i * 100));
+ commit.commit(i, write.prepareCommit(false, i + 1));
+ }
+ }
+
+
tableBranch.newExpireSnapshots().config(tableBranch.coreOptions().expireConfig()).expire();
+
+ // After expiration, baseline snapshot is gone — merge should fail
+ assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main"))
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "Branch merge requires complete append-only
snapshot history"));
+ }
+
+ @Test
+ public void testMergeBranchRejectsNonAppendHistory() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(0, 0, 0L));
+ commit.commit(0, write.prepareCommit(false, 1));
+ }
+
+ table.createTag("tag1", 1);
+ table.createBranch(BRANCH_NAME, "tag1");
+ FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+ // Write data to branch
+ try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+ StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+ write.write(rowData(0, 10, 100L));
+ commit.commit(1, write.prepareCommit(false, 2));
+ }
+
+ // Perform INSERT OVERWRITE on branch — creates an OVERWRITE snapshot
+ List<CommitMessage> commitMessages;
+ try (BatchTableWrite write =
+ tableBranch.newBatchWriteBuilder().withOverwrite().newWrite())
{
+ write.write(rowData(0, 20, 200L));
+ commitMessages = write.prepareCommit();
+ }
+ try (BatchTableCommit commit =
+
tableBranch.newBatchWriteBuilder().withOverwrite().newCommit()) {
+ commit.commit(commitMessages);
+ }
+
+ assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main"))
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "Branch merge requires complete append-only
snapshot history"));
+ }
+
+ @Test
+ public void testMergeBranchRejectsTargetOverwrite() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(0, 0, 0L));
+ commit.commit(0, write.prepareCommit(false, 1));
+ }
+
+ table.createTag("tag1", 1);
+ table.createBranch(BRANCH_NAME, "tag1");
+ FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+ try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+ StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+ write.write(rowData(0, 10, 100L));
+ commit.commit(1, write.prepareCommit(false, 2));
+ }
+
+ // INSERT OVERWRITE on target (main)
+ List<CommitMessage> commitMessages;
+ try (BatchTableWrite write =
table.newBatchWriteBuilder().withOverwrite().newWrite()) {
+ write.write(rowData(0, 20, 200L));
+ commitMessages = write.prepareCommit();
+ }
+ try (BatchTableCommit commit =
table.newBatchWriteBuilder().withOverwrite().newCommit()) {
+ commit.commit(commitMessages);
+ }
+
+ assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main"))
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "Branch merge requires complete append-only
snapshot history"));
+ }
+
+ @Test
+ public void testMergeBranchRejectsSourceOverwrite() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(0, 0, 0L));
+ commit.commit(0, write.prepareCommit(false, 1));
+ }
+
+ table.createTag("tag1", 1);
+ table.createBranch(BRANCH_NAME, "tag1");
+ FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+ try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+ StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+ write.write(rowData(0, 10, 100L));
+ commit.commit(1, write.prepareCommit(false, 2));
+ }
+
+ // INSERT OVERWRITE on source (branch)
+ List<CommitMessage> commitMessages;
+ try (BatchTableWrite write =
+ tableBranch.newBatchWriteBuilder().withOverwrite().newWrite())
{
+ write.write(rowData(0, 20, 200L));
+ commitMessages = write.prepareCommit();
+ }
+ try (BatchTableCommit commit =
+
tableBranch.newBatchWriteBuilder().withOverwrite().newCommit()) {
+ commit.commit(commitMessages);
+ }
+
+ assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main"))
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "Branch merge requires complete append-only
snapshot history"));
+ }
+
+ @Test
+ public void testMergeBranchRejectsSourceExpiredSnapshots() throws
Exception {
+ FileStoreTable table =
+ createBranchMergeTable(
+ conf -> {
+ conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 1);
+ conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 1);
+ });
+
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(0, 0, 0L));
+ commit.commit(0, write.prepareCommit(false, 1));
+ }
+
+ table.createTag("tag1", 1);
+ table.createBranch(BRANCH_NAME, "tag1");
+ FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+ for (int i = 1; i < 5; i++) {
+ try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+ StreamTableCommit commit =
tableBranch.newCommit(commitUser)) {
+ write.write(rowData(i, i * 10, (long) i * 100));
+ commit.commit(i, write.prepareCommit(false, i + 1));
+ }
+ }
+
+
tableBranch.newExpireSnapshots().config(tableBranch.coreOptions().expireConfig()).expire();
+
+ assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main"))
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "Branch merge requires complete append-only
snapshot history"));
+ }
+
+ @Test
+ public void testMergeBranchRejectsTargetExpiredSnapshots() throws
Exception {
+ FileStoreTable table =
+ createBranchMergeTable(
+ conf -> {
+ conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 1);
+ conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 1);
+ });
+
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(0, 0, 0L));
+ commit.commit(0, write.prepareCommit(false, 1));
+ }
+
+ table.createTag("tag1", 1);
+ table.createBranch(BRANCH_NAME, "tag1");
+ FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+ try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+ StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+ write.write(rowData(1, 10, 100L));
+ commit.commit(1, write.prepareCommit(false, 2));
+ }
+
+ // Write multiple commits on main to allow expiration
+ for (int i = 2; i < 5; i++) {
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(i, i * 10, (long) i * 100));
+ commit.commit(i, write.prepareCommit(false, i + 1));
+ }
+ }
+
+
table.newExpireSnapshots().config(table.coreOptions().expireConfig()).expire();
+
+ assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main"))
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "Branch merge requires complete append-only
snapshot history"));
+ }
+
+ @Test
+ public void testMergeBranchFromTagSucceeds() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(0, 0, 0L));
+ commit.commit(0, write.prepareCommit(false, 1));
+ }
+
+ table.createTag("tag1", 1);
+ table.createBranch(BRANCH_NAME, "tag1");
+ FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+ try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+ StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+ write.write(rowData(1, 10, 100L));
+ commit.commit(1, write.prepareCommit(false, 2));
+ }
+
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(2, 20, 200L));
+ commit.commit(2, write.prepareCommit(false, 2));
+ }
+
+ table.mergeBranch(BRANCH_NAME, "main");
+
+ assertThat(
+ getResult(
+ table.newRead(),
+
toSplits(table.newSnapshotReader().read().dataSplits()),
+ BATCH_ROW_TO_STRING))
+ .containsExactlyInAnyOrder(
+ "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+ "1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+ "2|20|200|binary|varbinary|mapKey:mapVal|multiset");
+ }
+
+ @Test
+ public void testMergePlainBranchSucceedsWithCompleteHistory() throws
Exception {
+ FileStoreTable table = createBranchMergeTable();
+
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(0, 0, 0L));
+ commit.commit(0, write.prepareCommit(false, 1));
+ }
+
+ table.createBranch(BRANCH_NAME);
+ FileStoreTable tableBranch = table.switchToBranch(BRANCH_NAME);
+
+ try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+ StreamTableCommit commit = tableBranch.newCommit(commitUser)) {
+ write.write(rowData(1, 10, 100L));
+ commit.commit(1, write.prepareCommit(false, 2));
+ }
+
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(2, 20, 200L));
+ commit.commit(2, write.prepareCommit(false, 2));
+ }
+
+ table.mergeBranch(BRANCH_NAME, "main");
+
+ assertThat(
+ getResult(
+ table.newRead(),
+
toSplits(table.newSnapshotReader().read().dataSplits()),
+ BATCH_ROW_TO_STRING))
+ .containsExactlyInAnyOrder(
+ "0|0|0|binary|varbinary|mapKey:mapVal|multiset",
+ "1|10|100|binary|varbinary|mapKey:mapVal|multiset",
+ "2|20|200|binary|varbinary|mapKey:mapVal|multiset");
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index 9401cce832..80cfb975bd 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -144,6 +144,7 @@ import static
org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST;
import static org.apache.paimon.predicate.SortValue.SortDirection.ASCENDING;
import static org.apache.paimon.predicate.SortValue.SortDirection.DESCENDING;
import static org.apache.paimon.table.SpecialFields.KEY_FIELD_PREFIX;
+import static
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -2672,4 +2673,21 @@ public class PrimaryKeySimpleTableTest extends
SimpleTableTestBase {
""));
return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath),
tablePath, tableSchema);
}
+
+ @Test
+ public void testMergeBranchPrimaryKeyTable() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(0, 0, 0L));
+ commit.commit(0, write.prepareCommit(false, 1));
+ }
+
+ table.createTag("tag1", 1);
+ table.createBranch(BRANCH_NAME, "tag1");
+
+ assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main"))
+ .satisfies(anyCauseMatches(IllegalArgumentException.class,
"append-only tables"));
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/FileSystemBranchManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/FileSystemBranchManagerTest.java
index c74b53d3f3..109303a928 100644
---
a/paimon-core/src/test/java/org/apache/paimon/utils/FileSystemBranchManagerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/utils/FileSystemBranchManagerTest.java
@@ -69,7 +69,7 @@ class FileSystemBranchManagerTest {
tagManager = new TagManager(fileIO, tablePath);
branchManager =
new FileSystemBranchManager(
- fileIO, tablePath, snapshotManager, tagManager,
schemaManager);
+ fileIO, tablePath, snapshotManager, tagManager,
schemaManager, null);
}
@Test