This is an automated email from the ASF dual-hosted git repository.

liguojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 483a69cba Implement replace branch in BranchManager (#2911)
483a69cba is described below

commit 483a69cbaa93aa71661dca014ecac82443a402ed
Author: Xiaojian Sun <[email protected]>
AuthorDate: Tue May 28 15:35:23 2024 +0800

    Implement replace branch in BranchManager (#2911)
---
 .../paimon/privilege/PrivilegedFileStoreTable.java |   6 +
 .../org/apache/paimon/schema/SchemaManager.java    |  11 +-
 .../paimon/table/AbstractFileStoreTable.java       |   5 +
 .../org/apache/paimon/table/ReadonlyTable.java     |   8 +
 .../main/java/org/apache/paimon/table/Table.java   |   3 +
 .../org/apache/paimon/utils/BranchManager.java     | 163 +++++++++++++++++++--
 .../org/apache/paimon/utils/SnapshotManager.java   |  33 ++---
 .../java/org/apache/paimon/utils/TagManager.java   |  13 +-
 .../flink/procedure/ReplaceBranchProcedure.java    |  54 +++++++
 .../services/org.apache.paimon.factories.Factory   |   1 +
 .../paimon/flink/action/BranchActionITCase.java    |  94 +++++++++++-
 11 files changed, 341 insertions(+), 50 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
index 1881c7350..e4b09df38 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
@@ -204,6 +204,12 @@ public class PrivilegedFileStoreTable implements 
FileStoreTable {
         wrapped.deleteBranch(branchName);
     }
 
+    @Override
+    public void replaceBranch(String fromBranch) {
+        privilegeChecker.assertCanInsert(identifier);
+        wrapped.replaceBranch(fromBranch);
+    }
+
     @Override
     public ExpireSnapshots newExpireSnapshots() {
         privilegeChecker.assertCanInsert(identifier);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 228d30d5c..cb478d7ba 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -68,7 +68,6 @@ import static 
org.apache.paimon.catalog.AbstractCatalog.DB_SUFFIX;
 import static org.apache.paimon.catalog.Identifier.UNKNOWN_DATABASE;
 import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
 import static org.apache.paimon.utils.BranchManager.getBranchPath;
-import static org.apache.paimon.utils.BranchManager.isMainBranch;
 import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
 import static org.apache.paimon.utils.Preconditions.checkState;
 
@@ -500,17 +499,13 @@ public class SchemaManager implements Serializable {
     }
 
     public Path schemaDirectory() {
-        return isMainBranch(branch)
-                ? new Path(tableRoot + "/schema")
-                : new Path(getBranchPath(tableRoot, branch) + "/schema");
+        return new Path(getBranchPath(fileIO, tableRoot, branch) + "/schema");
     }
 
     @VisibleForTesting
     public Path toSchemaPath(long schemaId) {
-        return isMainBranch(branch)
-                ? new Path(tableRoot + "/schema/" + SCHEMA_PREFIX + schemaId)
-                : new Path(
-                        getBranchPath(tableRoot, branch) + "/schema/" + 
SCHEMA_PREFIX + schemaId);
+        return new Path(
+                getBranchPath(fileIO, tableRoot, branch) + "/schema/" + 
SCHEMA_PREFIX + schemaId);
     }
 
     /**
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 655e81431..82cc47ad5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -525,6 +525,11 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
         branchManager().deleteBranch(branchName);
     }
 
+    @Override
+    public void replaceBranch(String fromBranch) {
+        branchManager().replaceBranch(fromBranch);
+    }
+
     @Override
     public void rollbackTo(String tagName) {
         TagManager tagManager = tagManager();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
index 42bea3f68..dcb62dfcb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
@@ -182,6 +182,14 @@ public interface ReadonlyTable extends InnerTable {
                         this.getClass().getSimpleName()));
     }
 
+    @Override
+    default void replaceBranch(String fromBranch) {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "Readonly Table %s does not support replaceBranch.",
+                        this.getClass().getSimpleName()));
+    }
+
     @Override
     default ExpireSnapshots newExpireSnapshots() {
         throw new UnsupportedOperationException(
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java 
b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
index 876908394..d01ecc95c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
@@ -111,6 +111,9 @@ public interface Table extends Serializable {
     @Experimental
     void deleteBranch(String branchName);
 
+    @Experimental
+    void replaceBranch(String fromBranch);
+
     /** Manually expire snapshots, parameters can be controlled independently 
of table options. */
     @Experimental
     ExpireSnapshots newExpireSnapshots();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
index 41099dfac..9742d63ac 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
@@ -33,9 +33,12 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
 import java.util.PriorityQueue;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.stream.Collectors;
 
@@ -49,6 +52,7 @@ public class BranchManager {
 
     public static final String BRANCH_PREFIX = "branch-";
     public static final String DEFAULT_MAIN_BRANCH = "main";
+    public static final String MAIN_BRANCH_FILE = "MAIN-BRANCH";
 
     private final FileIO fileIO;
     private final Path tablePath;
@@ -69,6 +73,12 @@ public class BranchManager {
         this.schemaManager = schemaManager;
     }
 
+    /** Commit specify branch to main. */
+    public void commitMainBranch(String branchName) throws IOException {
+        Path mainBranchFile = new Path(tablePath, MAIN_BRANCH_FILE);
+        fileIO.overwriteFileUtf8(mainBranchFile, branchName);
+    }
+
     /** Return the root Directory of branch. */
     public Path branchDirectory() {
         return new Path(tablePath + "/branch");
@@ -79,13 +89,45 @@ public class BranchManager {
     }
 
     /** Return the path string of a branch. */
-    public static String getBranchPath(Path tablePath, String branchName) {
-        return tablePath.toString() + "/branch/" + BRANCH_PREFIX + branchName;
+    public static String getBranchPath(FileIO fileIO, Path tablePath, String 
branch) {
+        if (isMainBranch(branch)) {
+            Path path = new Path(tablePath, MAIN_BRANCH_FILE);
+            try {
+                if (fileIO.exists(path)) {
+                    String data = fileIO.readFileUtf8(path);
+                    if (StringUtils.isBlank(data)) {
+                        return tablePath.toString();
+                    } else {
+                        return tablePath.toString() + "/branch/" + 
BRANCH_PREFIX + data;
+                    }
+                } else {
+                    return tablePath.toString();
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        return tablePath.toString() + "/branch/" + BRANCH_PREFIX + branch;
+    }
+
+    public String defaultMainBranch() {
+        Path path = new Path(tablePath, MAIN_BRANCH_FILE);
+        try {
+            if (fileIO.exists(path)) {
+                String data = fileIO.readFileUtf8(path);
+                if (!StringUtils.isBlank(data)) {
+                    return data;
+                }
+            }
+            return DEFAULT_MAIN_BRANCH;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     /** Return the path of a branch. */
     public Path branchPath(String branchName) {
-        return new Path(getBranchPath(tablePath, branchName));
+        return new Path(getBranchPath(fileIO, tablePath, branchName));
     }
 
     /** Create empty branch. */
@@ -111,7 +153,7 @@ public class BranchManager {
             throw new RuntimeException(
                     String.format(
                             "Exception occurs when create branch '%s' 
(directory in %s).",
-                            branchName, getBranchPath(tablePath, branchName)),
+                            branchName, getBranchPath(fileIO, tablePath, 
branchName)),
                     e);
         }
     }
@@ -143,17 +185,17 @@ public class BranchManager {
             throw new RuntimeException(
                     String.format(
                             "Exception occurs when create branch '%s' 
(directory in %s).",
-                            branchName, getBranchPath(tablePath, branchName)),
+                            branchName, getBranchPath(fileIO, tablePath, 
branchName)),
                     e);
         }
     }
 
     public void createBranch(String branchName, String tagName) {
+        String mainBranch = defaultMainBranch();
         checkArgument(
                 !isMainBranch(branchName),
                 String.format(
-                        "Branch name '%s' is the default branch and cannot be 
used.",
-                        DEFAULT_MAIN_BRANCH));
+                        "Branch name '%s' is the default branch and cannot be 
used.", mainBranch));
         checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is 
blank.", branchName);
         checkArgument(!branchExists(branchName), "Branch name '%s' already 
exists.", branchName);
         checkArgument(tagManager.tagExists(tagName), "Tag name '%s' not 
exists.", tagName);
@@ -179,7 +221,7 @@ public class BranchManager {
             throw new RuntimeException(
                     String.format(
                             "Exception occurs when create branch '%s' 
(directory in %s).",
-                            branchName, getBranchPath(tablePath, branchName)),
+                            branchName, getBranchPath(fileIO, tablePath, 
branchName)),
                     e);
         }
     }
@@ -193,11 +235,111 @@ public class BranchManager {
             LOG.info(
                     String.format(
                             "Deleting the branch failed due to an exception in 
deleting the directory %s. Please try again.",
-                            getBranchPath(tablePath, branchName)),
+                            getBranchPath(fileIO, tablePath, branchName)),
                     e);
         }
     }
 
+    /** Replace specify branch to main branch. */
+    public void replaceBranch(String branchName) {
+        String mainBranch = defaultMainBranch();
+        checkArgument(
+                !isMainBranch(branchName),
+                String.format(
+                        "Branch name '%s' is the default main branch and 
cannot be replaced repeatedly.",
+                        mainBranch));
+        checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is 
blank.", branchName);
+        checkArgument(branchExists(branchName), "Branch name '%s' not 
exists.", branchName);
+        try {
+            // 0. Cache previous tag,snapshot,schema directory.
+            Path tagDirectory = tagManager.tagDirectory();
+            Path snapshotDirectory = snapshotManager.snapshotDirectory();
+            Path schemaDirectory = schemaManager.schemaDirectory();
+            // 1. Calculate and copy the snapshots, tags and schemas which 
should be copied from the
+            // main to branch.
+            calculateCopyMainToBranch(branchName);
+            // 2. Update the Main Branch File to the target branch.
+            commitMainBranch(branchName);
+            // 3.Drop the previous main branch, including snapshots, tags and 
schemas.
+            dropPreviousMainBranch(tagDirectory, snapshotDirectory, 
schemaDirectory);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /** Calculate copy main branch to target branch. */
+    private void calculateCopyMainToBranch(String branchName) throws 
IOException {
+        TableBranch fromBranch =
+                this.branches().stream()
+                        .filter(branch -> 
branch.getBranchName().equals(branchName))
+                        .findFirst()
+                        .orElse(null);
+        if (fromBranch == null) {
+            throw new RuntimeException(String.format("No branches found %s", 
branchName));
+        }
+        Snapshot fromSnapshot = 
snapshotManager.snapshot(fromBranch.getCreatedFromSnapshot());
+        // Copy tags.
+        List<String> tags = tagManager.allTagNames();
+        TagManager branchTagManager = tagManager.copyWithBranch(branchName);
+        for (String tagName : tags) {
+            if (branchTagManager.tagExists(tagName)) {
+                // If it already exists, skip it directly.
+                continue;
+            }
+            Snapshot snapshot = tagManager.taggedSnapshot(tagName);
+            if (snapshot.id() < fromSnapshot.id()) {
+                fileIO.copyFileUtf8(tagManager.tagPath(tagName), 
branchTagManager.tagPath(tagName));
+            }
+        }
+        // Copy snapshots.
+        Iterator<Snapshot> snapshots = snapshotManager.snapshots();
+        SnapshotManager branchSnapshotManager = 
snapshotManager.copyWithBranch(branchName);
+        while (snapshots.hasNext()) {
+            Snapshot snapshot = snapshots.next();
+            if (snapshot.id() >= fromSnapshot.id()) {
+                continue;
+            }
+            if (branchSnapshotManager.snapshotExists(snapshot.id())) {
+                // If it already exists, skip it directly.
+                continue;
+            }
+            fileIO.copyFileUtf8(
+                    snapshotManager.snapshotPath(snapshot.id()),
+                    branchSnapshotManager.snapshotPath(snapshot.id()));
+        }
+
+        // Copy schemas.
+        List<Long> schemaIds = schemaManager.listAllIds();
+        SchemaManager branchSchemaManager = 
schemaManager.copyWithBranch(branchName);
+        Set<Long> existsSchemas = new 
HashSet<>(branchSchemaManager.listAllIds());
+
+        for (Long schemaId : schemaIds) {
+            if (existsSchemas.contains(schemaId)) {
+                // If it already exists, skip it directly.
+                continue;
+            }
+            TableSchema tableSchema = schemaManager.schema(schemaId);
+            if (tableSchema.id() < fromSnapshot.schemaId()) {
+                fileIO.copyFileUtf8(
+                        schemaManager.toSchemaPath(schemaId),
+                        branchSchemaManager.toSchemaPath(schemaId));
+            }
+        }
+    }
+
+    /** Directly delete snapshot, tag , schema directory. */
+    private void dropPreviousMainBranch(
+            Path tagDirectory, Path snapshotDirectory, Path schemaDirectory) 
throws IOException {
+        // Delete tags.
+        fileIO.delete(tagDirectory, true);
+
+        // Delete snapshots.
+        fileIO.delete(snapshotDirectory, true);
+
+        // Delete schemas.
+        fileIO.delete(schemaDirectory, true);
+    }
+
     /** Check if path exists. */
     public boolean fileExists(Path path) {
         try {
@@ -246,8 +388,7 @@ public class BranchManager {
                 }
                 FileStoreTable branchTable =
                         FileStoreTableFactory.create(
-                                fileIO, new Path(getBranchPath(tablePath, 
branchName)));
-
+                                fileIO, new Path(getBranchPath(fileIO, 
tablePath, branchName)));
                 SortedMap<Snapshot, List<String>> snapshotTags = 
branchTable.tagManager().tags();
                 Long earliestSnapshotId = 
branchTable.snapshotManager().earliestSnapshotId();
                 if (snapshotTags.isEmpty()) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 63679b86a..9813debe4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -47,7 +47,6 @@ import java.util.stream.Collectors;
 
 import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
 import static org.apache.paimon.utils.BranchManager.getBranchPath;
-import static org.apache.paimon.utils.BranchManager.isMainBranch;
 import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
 
 /** Manager for {@link Snapshot}, providing utility methods related to paths 
and snapshot hints. */
@@ -90,35 +89,27 @@ public class SnapshotManager implements Serializable {
     }
 
     public Path changelogDirectory() {
-        return isMainBranch(branch)
-                ? new Path(tablePath + "/changelog")
-                : new Path(getBranchPath(tablePath, branch) + "/changelog");
+        return new Path(getBranchPath(fileIO, tablePath, branch) + 
"/changelog");
     }
 
     public Path longLivedChangelogPath(long snapshotId) {
-        return isMainBranch(branch)
-                ? new Path(tablePath + "/changelog/" + CHANGELOG_PREFIX + 
snapshotId)
-                : new Path(
-                        getBranchPath(tablePath, branch)
-                                + "/changelog/"
-                                + CHANGELOG_PREFIX
-                                + snapshotId);
+        return new Path(
+                getBranchPath(fileIO, tablePath, branch)
+                        + "/changelog/"
+                        + CHANGELOG_PREFIX
+                        + snapshotId);
     }
 
     public Path snapshotPath(long snapshotId) {
-        return isMainBranch(branch)
-                ? new Path(tablePath + "/snapshot/" + SNAPSHOT_PREFIX + 
snapshotId)
-                : new Path(
-                        getBranchPath(tablePath, branch)
-                                + "/snapshot/"
-                                + SNAPSHOT_PREFIX
-                                + snapshotId);
+        return new Path(
+                getBranchPath(fileIO, tablePath, branch)
+                        + "/snapshot/"
+                        + SNAPSHOT_PREFIX
+                        + snapshotId);
     }
 
     public Path snapshotDirectory() {
-        return isMainBranch(branch)
-                ? new Path(tablePath + "/snapshot")
-                : new Path(getBranchPath(tablePath, branch) + "/snapshot");
+        return new Path(getBranchPath(fileIO, tablePath, branch) + 
"/snapshot");
     }
 
     public Snapshot snapshot(long snapshotId) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index c96bbdd56..6c59ef53c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -47,7 +47,6 @@ import java.util.stream.Collectors;
 
 import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
 import static org.apache.paimon.utils.BranchManager.getBranchPath;
-import static org.apache.paimon.utils.BranchManager.isMainBranch;
 import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
@@ -79,16 +78,12 @@ public class TagManager {
 
     /** Return the root Directory of tags. */
     public Path tagDirectory() {
-        return isMainBranch(branch)
-                ? new Path(tablePath + "/tag")
-                : new Path(getBranchPath(tablePath, branch) + "/tag");
+        return new Path(getBranchPath(fileIO, tablePath, branch) + "/tag");
     }
 
-    /** Return the path of a tag in branch. */
+    /** Return the path of a tag. */
     public Path tagPath(String tagName) {
-        return isMainBranch(branch)
-                ? new Path(tablePath + "/tag/" + TAG_PREFIX + tagName)
-                : new Path(getBranchPath(tablePath, branch) + "/tag/" + 
TAG_PREFIX + tagName);
+        return new Path(getBranchPath(fileIO, tablePath, branch) + "/tag/" + 
TAG_PREFIX + tagName);
     }
 
     /** Create a tag from given snapshot and save it in the storage. */
@@ -237,7 +232,7 @@ public class TagManager {
                 taggedSnapshot, 
tagDeletion.manifestSkippingSet(skippedSnapshots));
     }
 
-    /** Check if a branch tag exists. */
+    /** Check if a tag exists. */
     public boolean tagExists(String tagName) {
         Path path = tagPath(tagName);
         try {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceBranchProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceBranchProcedure.java
new file mode 100644
index 000000000..10ef4a67a
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceBranchProcedure.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Replace branch procedure for given branch. Usage:
+ *
+ * <pre><code>
+ *  CALL sys.replace_branch('tableId', 'branchName')
+ * </code></pre>
+ */
+public class ReplaceBranchProcedure extends ProcedureBase {
+
+    public static final String IDENTIFIER = "replace_branch";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    public String[] call(ProcedureContext procedureContext, String tableId, 
String branchName)
+            throws Catalog.TableNotExistException {
+        return innerCall(tableId, branchName);
+    }
+
+    private String[] innerCall(String tableId, String branchName)
+            throws Catalog.TableNotExistException {
+        Table table = catalog.getTable(Identifier.fromString(tableId));
+        table.replaceBranch(branchName);
+        return new String[] {"Success"};
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 848dd317d..33a43009d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -51,3 +51,4 @@ 
org.apache.paimon.flink.procedure.privilege.DropPrivilegedUserProcedure
 org.apache.paimon.flink.procedure.privilege.GrantPrivilegeToUserProcedure
 org.apache.paimon.flink.procedure.privilege.RevokePrivilegeFromUserProcedure
 org.apache.paimon.flink.procedure.RepairProcedure
+org.apache.paimon.flink.procedure.ReplaceBranchProcedure
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
index 007d1ac5c..209b0d2e7 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
@@ -21,15 +21,22 @@ package org.apache.paimon.flink.action;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
+import org.junit.Assert;
 import org.junit.jupiter.api.Test;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -115,7 +122,6 @@ class BranchActionITCase extends ActionITCaseBase {
                         "CALL sys.create_branch('%s.%s', 
'branch_name_with_snapshotId', 2)",
                         database, tableName));
         
assertThat(branchManager.branchExists("branch_name_with_snapshotId")).isTrue();
-        branchManager.branches();
 
         callProcedure(
                 String.format(
@@ -163,4 +169,90 @@ class BranchActionITCase extends ActionITCaseBase {
                         database, tableName));
         assertThat(branchManager.branchExists("empty_branch_name")).isFalse();
     }
+
+    @Test
+    void testReplaceBranch() throws Exception {
+        init(warehouse);
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.BIGINT(), 
DataTypes.STRING()},
+                        new String[] {"k", "v"});
+        FileStoreTable table =
+                createFileStoreTable(
+                        rowType,
+                        Collections.emptyList(),
+                        Collections.singletonList("k"),
+                        Collections.emptyList(),
+                        Collections.emptyMap());
+
+        StreamWriteBuilder writeBuilder = 
table.newStreamWriteBuilder().withCommitUser(commitUser);
+        write = writeBuilder.newWrite();
+        commit = writeBuilder.newCommit();
+
+        // 3 snapshots
+        writeData(rowData(1L, BinaryString.fromString("Hi")));
+        writeData(rowData(2L, BinaryString.fromString("Hello")));
+        writeData(rowData(3L, BinaryString.fromString("Paimon")));
+
+        // Create tag2
+        TagManager tagManager = new TagManager(table.fileIO(), 
table.location());
+        callProcedure(
+                String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", 
database, tableName));
+        assertThat(tagManager.tagExists("tag2")).isTrue();
+
+        // Create replace_branch_name branch
+        BranchManager branchManager = table.branchManager();
+        callProcedure(
+                String.format(
+                        "CALL sys.create_branch('%s.%s', 
'replace_branch_name', 'tag2')",
+                        database, tableName));
+        assertThat(branchManager.branchExists("replace_branch_name")).isTrue();
+
+        // Replace branch
+        callProcedure(
+                String.format(
+                        "CALL sys.replace_branch('%s.%s', 
'replace_branch_name')",
+                        database, tableName));
+
+        // Check snapshot
+        SnapshotManager snapshotManager = table.snapshotManager();
+        assertThat(snapshotManager.snapshotExists(3)).isFalse();
+
+        // Renew write
+        write = writeBuilder.newWrite();
+        commit = writeBuilder.newCommit();
+
+        // Add data, forward to replace branch
+        for (long i = 4; i < 14; i++) {
+            writeData(rowData(i, 
BinaryString.fromString(String.format("new.data_%s", i))));
+        }
+
+        ReadBuilder readBuilder = table.newReadBuilder();
+        TableScan.Plan plan = readBuilder.newScan().plan();
+        List<String> result =
+                getResult(
+                        readBuilder.newRead(),
+                        plan == null ? Collections.emptyList() : plan.splits(),
+                        rowType);
+        List<String> sortedActual = new ArrayList<>(result);
+        List<String> expected =
+                Arrays.asList(
+                        "+I[1, Hi]",
+                        "+I[2, Hello]",
+                        "+I[4, new.data_4]",
+                        "+I[5, new.data_5]",
+                        "+I[6, new.data_6]",
+                        "+I[7, new.data_7]",
+                        "+I[8, new.data_8]",
+                        "+I[9, new.data_9]",
+                        "+I[10, new.data_10]",
+                        "+I[11, new.data_11]",
+                        "+I[12, new.data_12]",
+                        "+I[13, new.data_13]");
+        Assert.assertEquals(expected, sortedActual);
+
+        callProcedure(
+                String.format("CALL sys.create_tag('%s.%s', 'tag3', 3)", 
database, tableName));
+        assertThat(tagManager.tagExists("tag3")).isTrue();
+    }
 }

Reply via email to