This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 85b8dee500 [core] Add IF NOT EXISTS support for create_branch 
operation (#7353)
85b8dee500 is described below

commit 85b8dee500ba25d03bb0ff7416387bc88cf4e58f
Author: Rhett CfZhuang <[email protected]>
AuthorDate: Sat Mar 7 14:20:38 2026 +0800

    [core] Add IF NOT EXISTS support for create_branch operation (#7353)
---
 .../java/org/apache/paimon/catalog/Catalog.java    |  22 +++
 .../org/apache/paimon/catalog/DelegateCatalog.java |   7 +
 .../paimon/privilege/PrivilegedFileStoreTable.java |  12 ++
 .../paimon/table/AbstractFileStoreTable.java       |  10 ++
 .../paimon/table/DelegatedFileStoreTable.java      |  10 ++
 .../java/org/apache/paimon/table/FormatTable.java  |  10 ++
 .../org/apache/paimon/table/ReadonlyTable.java     |  16 ++
 .../main/java/org/apache/paimon/table/Table.java   |  19 +++
 .../org/apache/paimon/utils/BranchManager.java     |  19 +++
 .../apache/paimon/utils/CatalogBranchManager.java  |  15 +-
 .../paimon/utils/FileSystemBranchManager.java      |  16 ++
 .../apache/paimon/table/SimpleTableTestBase.java   |  50 ++++++
 .../flink/procedure/CreateBranchProcedure.java     |  21 ++-
 .../paimon/flink/action/CreateBranchAction.java    |   9 +-
 .../flink/action/CreateBranchActionFactory.java    |   7 +-
 .../flink/procedure/CreateBranchProcedure.java     |  62 +++++++-
 .../paimon/flink/action/BranchActionITCase.java    | 169 ++++++++++++++++++++-
 .../flink/action/MultipleParameterToolAdapter.java |   8 +
 .../flink/action/MultipleParameterToolAdapter.java |   8 +
 .../spark/procedure/CreateBranchProcedure.java     |   8 +-
 20 files changed, 480 insertions(+), 18 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 5dfbaf2978..ef4a943e69 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -810,6 +810,28 @@ public interface Catalog extends AutoCloseable {
     void createBranch(Identifier identifier, String branch, @Nullable String 
fromTag)
             throws TableNotExistException, BranchAlreadyExistException, 
TagNotExistException;
 
+    /**
+     * Create a branch for this table with option to ignore if the branch 
already exists.
+     *
+     * @param identifier path of the table, cannot be system or branch name.
+     * @param branch the branch name
+     * @param fromTag from the tag
+     * @param ignoreIfExists if true, do nothing when branch already exists
+     * @throws TableNotExistException if the table in identifier doesn't exist
+     * @throws BranchAlreadyExistException if the branch already exists and 
ignoreIfExists is false
+     * @throws TagNotExistException if the tag doesn't exist
+     * @throws UnsupportedOperationException if the catalog does not {@link
+     *     #supportsVersionManagement()}
+     */
+    default void createBranch(
+            Identifier identifier, String branch, @Nullable String fromTag, 
boolean ignoreIfExists)
+            throws TableNotExistException, BranchAlreadyExistException, 
TagNotExistException {
+        if (ignoreIfExists && listBranches(identifier).contains(branch)) {
+            return;
+        }
+        createBranch(identifier, branch, fromTag);
+    }
+
     /**
      * Drop the branch for this table.
      *
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index 3f1ea209a9..81558ac68a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -228,6 +228,13 @@ public abstract class DelegateCatalog implements Catalog {
         wrapped.createBranch(identifier, branch, fromTag);
     }
 
+    @Override
+    public void createBranch(
+            Identifier identifier, String branch, @Nullable String fromTag, 
boolean ignoreIfExists)
+            throws TableNotExistException, BranchAlreadyExistException, 
TagNotExistException {
+        wrapped.createBranch(identifier, branch, fromTag, ignoreIfExists);
+    }
+
     @Override
     public void dropBranch(Identifier identifier, String branch) throws 
BranchNotExistException {
         wrapped.dropBranch(identifier, branch);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
index ad6e998d45..a2c3e62c36 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
@@ -180,6 +180,18 @@ public class PrivilegedFileStoreTable extends 
DelegatedFileStoreTable {
         wrapped.createBranch(branchName, tagName);
     }
 
+    @Override
+    public void createBranch(String branchName, boolean ignoreIfExists) {
+        privilegeChecker.assertCanInsert(identifier);
+        wrapped.createBranch(branchName, ignoreIfExists);
+    }
+
+    @Override
+    public void createBranch(String branchName, String tagName, boolean 
ignoreIfExists) {
+        privilegeChecker.assertCanInsert(identifier);
+        wrapped.createBranch(branchName, tagName, ignoreIfExists);
+    }
+
     @Override
     public void deleteBranch(String branchName) {
         privilegeChecker.assertCanInsert(identifier);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index b02b0ea958..014df441ac 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -673,6 +673,16 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
         branchManager().createBranch(branchName, tagName);
     }
 
+    @Override
+    public void createBranch(String branchName, boolean ignoreIfExists) {
+        branchManager().createBranch(branchName, ignoreIfExists);
+    }
+
+    @Override
+    public void createBranch(String branchName, String tagName, boolean 
ignoreIfExists) {
+        branchManager().createBranch(branchName, tagName, ignoreIfExists);
+    }
+
     @Override
     public void deleteBranch(String branchName) {
         String fallbackBranch =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
index 3a0d4ecc1c..b92c4c1630 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
@@ -265,6 +265,16 @@ public abstract class DelegatedFileStoreTable implements 
FileStoreTable {
         wrapped.createBranch(branchName, tagName);
     }
 
+    @Override
+    public void createBranch(String branchName, boolean ignoreIfExists) {
+        wrapped.createBranch(branchName, ignoreIfExists);
+    }
+
+    @Override
+    public void createBranch(String branchName, String tagName, boolean 
ignoreIfExists) {
+        wrapped.createBranch(branchName, tagName, ignoreIfExists);
+    }
+
     @Override
     public void deleteBranch(String branchName) {
         wrapped.deleteBranch(branchName);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
index af6dd9ca26..f1fbc74c12 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
@@ -384,6 +384,16 @@ public interface FormatTable extends Table {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    default void createBranch(String branchName, boolean ignoreIfExists) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    default void createBranch(String branchName, String tagName, boolean 
ignoreIfExists) {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     default void deleteBranch(String branchName) {
         throw new UnsupportedOperationException();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
index cb6e3d3fb3..c5b8deeca9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
@@ -233,6 +233,22 @@ public interface ReadonlyTable extends InnerTable {
                         this.getClass().getSimpleName()));
     }
 
+    @Override
+    default void createBranch(String branchName, boolean ignoreIfExists) {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "Readonly Table %s does not support create branch.",
+                        this.getClass().getSimpleName()));
+    }
+
+    @Override
+    default void createBranch(String branchName, String tagName, boolean 
ignoreIfExists) {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "Readonly Table %s does not support create branch.",
+                        this.getClass().getSimpleName()));
+    }
+
     @Override
     default void deleteBranch(String branchName) {
         throw new UnsupportedOperationException(
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java 
b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
index 9c1f07f3aa..8d49d7206e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
@@ -165,6 +165,25 @@ public interface Table extends Serializable {
     @Experimental
     void createBranch(String branchName, String tagName);
 
+    /**
+     * Create an empty branch with option to ignore if the branch already 
exists.
+     *
+     * @param branchName the branch name
+     * @param ignoreIfExists if true, do nothing when branch already exists
+     */
+    @Experimental
+    void createBranch(String branchName, boolean ignoreIfExists);
+
+    /**
+     * Create a branch from given tag with option to ignore if the branch 
already exists.
+     *
+     * @param branchName the branch name
+     * @param tagName the tag name to create branch from
+     * @param ignoreIfExists if true, do nothing when branch already exists
+     */
+    @Experimental
+    void createBranch(String branchName, String tagName, boolean 
ignoreIfExists);
+
     /** Delete a branch by branchName. */
     @Experimental
     void deleteBranch(String branchName);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
index fc3defa8f3..e055724efe 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
@@ -36,6 +36,25 @@ public interface BranchManager {
 
     void createBranch(String branchName, @Nullable String tagName);
 
+    /**
+     * Create a branch with option to ignore if the branch already exists.
+     *
+     * @param branchName the branch name
+     * @param ignoreIfExists if true, do nothing when branch already exists; 
if false, throw
+     *     exception
+     */
+    void createBranch(String branchName, boolean ignoreIfExists);
+
+    /**
+     * Create a branch from tag with option to ignore if the branch already 
exists.
+     *
+     * @param branchName the branch name
+     * @param tagName the tag name to create branch from
+     * @param ignoreIfExists if true, do nothing when branch already exists; 
if false, throw
+     *     exception
+     */
+    void createBranch(String branchName, @Nullable String tagName, boolean 
ignoreIfExists);
+
     void dropBranch(String branchName);
 
     void fastForward(String branchName);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java
index a395092456..8723135bd0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java
@@ -68,14 +68,27 @@ public class CatalogBranchManager implements BranchManager {
 
     @Override
     public void createBranch(String branchName) {
-        executePost(catalog -> catalog.createBranch(identifier, branchName, 
null));
+        createBranch(branchName, false);
     }
 
     @Override
     public void createBranch(String branchName, @Nullable String tagName) {
+        createBranch(branchName, tagName, false);
+    }
+
+    @Override
+    public void createBranch(String branchName, boolean ignoreIfExists) {
+        createBranch(branchName, null, ignoreIfExists);
+    }
+
+    @Override
+    public void createBranch(String branchName, @Nullable String tagName, 
boolean ignoreIfExists) {
         executePost(
                 catalog -> {
                     BranchManager.validateBranch(branchName);
+                    if (ignoreIfExists && 
catalog.listBranches(identifier).contains(branchName)) {
+                        return;
+                    }
                     catalog.createBranch(identifier, branchName, tagName);
                 });
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java
index 7a1bdbef29..ef3a04ec99 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java
@@ -73,6 +73,14 @@ public class FileSystemBranchManager implements 
BranchManager {
 
     @Override
     public void createBranch(String branchName) {
+        createBranch(branchName, false);
+    }
+
+    @Override
+    public void createBranch(String branchName, boolean ignoreIfExists) {
+        if (ignoreIfExists && branchExists(branchName)) {
+            return;
+        }
         validateBranch(branchName);
         try {
             TableSchema latestSchema = schemaManager.latest().get();
@@ -88,6 +96,14 @@ public class FileSystemBranchManager implements 
BranchManager {
 
     @Override
     public void createBranch(String branchName, String tagName) {
+        createBranch(branchName, tagName, false);
+    }
+
+    @Override
+    public void createBranch(String branchName, String tagName, boolean 
ignoreIfExists) {
+        if (ignoreIfExists && branchExists(branchName)) {
+            return;
+        }
         validateBranch(branchName);
         Snapshot snapshot = tagManager.getOrThrow(tagName).trimToSnapshot();
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
index a310404f53..1b58b012cd 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
@@ -1134,6 +1134,56 @@ public abstract class SimpleTableTestBase {
                                 "Branch name cannot be pure numeric string but 
is '10'."));
     }
 
+    @Test
+    public void testCreateBranchWithIgnoreIfExists() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                StreamTableCommit commit = table.newCommit(commitUser)) {
+            write.write(rowData(1, 10, 100L));
+            commit.commit(0, write.prepareCommit(false, 1));
+            write.write(rowData(2, 20, 200L));
+            commit.commit(1, write.prepareCommit(false, 2));
+        }
+
+        table.createTag("test-tag", 2);
+        BranchManager branchManager = table.branchManager();
+
+        // Test create branch with ignoreIfExists=true (branch doesn't exist)
+        table.createBranch("new-branch", "test-tag", true);
+        assertThat(branchManager.branchExists("new-branch")).isTrue();
+
+        // Test create branch with ignoreIfExists=false (branch doesn't exist)
+        table.createBranch("another-branch", "test-tag", false);
+        assertThat(branchManager.branchExists("another-branch")).isTrue();
+
+        // Test create existing branch with ignoreIfExists=true (should 
succeed silently)
+        table.createBranch("new-branch", "test-tag", true);
+        assertThat(branchManager.branchExists("new-branch")).isTrue();
+
+        // Test create existing branch with ignoreIfExists=false (should throw 
exception)
+        assertThatThrownBy(() -> table.createBranch("new-branch", "test-tag", 
false))
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "Branch name 'new-branch' already exists."));
+
+        // Test create empty branch with ignoreIfExists
+        table.createBranch("empty-branch", true);
+        assertThat(branchManager.branchExists("empty-branch")).isTrue();
+
+        // Test create existing empty branch with ignoreIfExists=true
+        table.createBranch("empty-branch", true);
+        assertThat(branchManager.branchExists("empty-branch")).isTrue();
+
+        // Test create existing empty branch with ignoreIfExists=false
+        assertThatThrownBy(() -> table.createBranch("empty-branch", false))
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "Branch name 'empty-branch' already exists."));
+    }
+
     @Test
     public void testDeleteBranch() throws Exception {
         FileStoreTable table = createFileStoreTable();
diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
index 0bf2292b14..a7a8fb9952 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
@@ -45,21 +45,32 @@ public class CreateBranchProcedure extends ProcedureBase {
     public String[] call(
             ProcedureContext procedureContext, String tableId, String 
branchName, String tagName)
             throws Catalog.TableNotExistException {
-        return innerCall(tableId, branchName, tagName);
+        return innerCall(tableId, branchName, tagName, false);
     }
 
     public String[] call(ProcedureContext procedureContext, String tableId, 
String branchName)
             throws Catalog.TableNotExistException {
-        return innerCall(tableId, branchName, null);
+        return innerCall(tableId, branchName, null, false);
     }
 
-    private String[] innerCall(String tableId, String branchName, String 
tagName)
+    public String[] call(
+            ProcedureContext procedureContext,
+            String tableId,
+            String branchName,
+            String tagName,
+            Boolean ignoreIfExists)
+            throws Catalog.TableNotExistException {
+        return innerCall(tableId, branchName, tagName, ignoreIfExists != null 
&& ignoreIfExists);
+    }
+
+    private String[] innerCall(
+            String tableId, String branchName, String tagName, boolean 
ignoreIfExists)
             throws Catalog.TableNotExistException {
         Table table = catalog.getTable(Identifier.fromString(tableId));
         if (!StringUtils.isBlank(tagName)) {
-            table.createBranch(branchName, tagName);
+            table.createBranch(branchName, tagName, ignoreIfExists);
         } else {
-            table.createBranch(branchName);
+            table.createBranch(branchName, ignoreIfExists);
         }
         return new String[] {"Success"};
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchAction.java
index bfa765dd75..0d681cae73 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchAction.java
@@ -26,24 +26,27 @@ import java.util.Map;
 public class CreateBranchAction extends TableActionBase implements LocalAction 
{
     private final String branchName;
     private final String tagName;
+    private final boolean ignoreIfExists;
 
     public CreateBranchAction(
             String databaseName,
             String tableName,
             Map<String, String> catalogConfig,
             String branchName,
-            String tagName) {
+            String tagName,
+            boolean ignoreIfExists) {
         super(databaseName, tableName, catalogConfig);
         this.branchName = branchName;
         this.tagName = tagName;
+        this.ignoreIfExists = ignoreIfExists;
     }
 
     @Override
     public void executeLocally() {
         if (!StringUtils.isBlank(tagName)) {
-            table.createBranch(branchName, tagName);
+            table.createBranch(branchName, tagName, ignoreIfExists);
         } else {
-            table.createBranch(branchName);
+            table.createBranch(branchName, ignoreIfExists);
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchActionFactory.java
index 69dc1b3792..9ff1ef7896 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchActionFactory.java
@@ -27,6 +27,7 @@ public class CreateBranchActionFactory implements 
ActionFactory {
 
     private static final String TAG_NAME = "tag_name";
     private static final String BRANCH_NAME = "branch_name";
+    private static final String IGNORE_IF_EXISTS = "ignore_if_exists";
 
     @Override
     public String identifier() {
@@ -41,7 +42,8 @@ public class CreateBranchActionFactory implements 
ActionFactory {
                         params.getRequired(TABLE),
                         catalogConfigMap(params),
                         params.getRequired(BRANCH_NAME),
-                        params.get(TAG_NAME));
+                        params.get(TAG_NAME),
+                        params.getBoolean(IGNORE_IF_EXISTS, false));
         return Optional.of(action);
     }
 
@@ -57,7 +59,8 @@ public class CreateBranchActionFactory implements 
ActionFactory {
                         + "--database <database_name> \\\n"
                         + "--table <table_name> \\\n"
                         + "--branch_name <branch_name> \\\n"
-                        + "[--tag_name <tag_name>]");
+                        + "[--tag_name <tag_name>] \\\n"
+                        + "[--ignore_if_exists <true/false>]");
         System.out.println();
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
index 897f2d6bda..90cb5f8cd9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
@@ -33,7 +33,10 @@ import org.apache.flink.table.procedure.ProcedureContext;
  * Create branch procedure for given tag. Usage:
  *
  * <pre><code>
+ *  CALL sys.create_branch('tableId', 'branchName')
  *  CALL sys.create_branch('tableId', 'branchName', 'tagName')
+ *  CALL sys.create_branch('tableId', 'branchName', true)
+ *  CALL sys.create_branch('tableId', 'branchName', 'tagName', true)
  * </code></pre>
  */
 public class CreateBranchProcedure extends ProcedureBase {
@@ -45,6 +48,44 @@ public class CreateBranchProcedure extends ProcedureBase {
         return IDENTIFIER;
     }
 
+    @ProcedureHint(
+            argument = {
+                @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+                @ArgumentHint(name = "branch", type = @DataTypeHint("STRING")),
+                @ArgumentHint(name = "tag", type = @DataTypeHint("STRING"), 
isOptional = true),
+                @ArgumentHint(
+                        name = "ignoreIfExists",
+                        type = @DataTypeHint("BOOLEAN"),
+                        isOptional = true)
+            })
+    public String[] call(
+            ProcedureContext procedureContext,
+            String tableId,
+            String branchName,
+            String tagName,
+            Boolean ignoreIfExists)
+            throws Catalog.TableNotExistException {
+        return innerCall(tableId, branchName, tagName, ignoreIfExists);
+    }
+
+    @ProcedureHint(
+            argument = {
+                @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+                @ArgumentHint(name = "branch", type = @DataTypeHint("STRING")),
+                @ArgumentHint(
+                        name = "ignoreIfExists",
+                        type = @DataTypeHint("BOOLEAN"),
+                        isOptional = true)
+            })
+    public String[] call(
+            ProcedureContext procedureContext,
+            String tableId,
+            String branchName,
+            Boolean ignoreIfExists)
+            throws Catalog.TableNotExistException {
+        return innerCall(tableId, branchName, null, ignoreIfExists);
+    }
+
     @ProcedureHint(
             argument = {
                 @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
@@ -54,11 +95,28 @@ public class CreateBranchProcedure extends ProcedureBase {
     public String[] call(
             ProcedureContext procedureContext, String tableId, String 
branchName, String tagName)
             throws Catalog.TableNotExistException {
+        return innerCall(tableId, branchName, tagName, false);
+    }
+
+    @ProcedureHint(
+            argument = {
+                @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+                @ArgumentHint(name = "branch", type = @DataTypeHint("STRING"))
+            })
+    public String[] call(ProcedureContext procedureContext, String tableId, 
String branchName)
+            throws Catalog.TableNotExistException {
+        return innerCall(tableId, branchName, null, false);
+    }
+
+    private String[] innerCall(
+            String tableId, String branchName, String tagName, Boolean 
ignoreIfExists)
+            throws Catalog.TableNotExistException {
         Table table = catalog.getTable(Identifier.fromString(tableId));
+        boolean ignore = ignoreIfExists != null && ignoreIfExists;
         if (!StringUtils.isBlank(tagName)) {
-            table.createBranch(branchName, tagName);
+            table.createBranch(branchName, tagName, ignore);
         } else {
-            table.createBranch(branchName);
+            table.createBranch(branchName, ignore);
         }
         return new String[] {"Success"};
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
index e2387a8c38..c0ece12168 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
@@ -30,6 +30,7 @@ import org.apache.paimon.utils.BranchManager;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -87,7 +88,7 @@ public class BranchActionITCase extends ActionITCaseBase {
 
         executeSQL(
                 String.format(
-                        "CALL sys.create_branch(`table` => '%s.%s', branch => 
'branch_name_named_argument', tag => 'tag2')",
+                        "CALL sys.create_branch('%s.%s', 
'branch_name_named_argument', 'tag2')",
                         database, tableName));
         
assertThat(branchManager.branchExists("branch_name_named_argument")).isTrue();
 
@@ -168,7 +169,7 @@ public class BranchActionITCase extends ActionITCaseBase {
 
         executeSQL(
                 String.format(
-                        "CALL sys.create_branch(`table` => '%s.%s', branch => 
'empty_branch_named_argument')",
+                        "CALL sys.create_branch('%s.%s', 
'empty_branch_named_argument')",
                         database, tableName));
         
assertThat(branchManager.branchExists("empty_branch_named_argument")).isTrue();
 
@@ -213,6 +214,170 @@ public class BranchActionITCase extends ActionITCaseBase {
         assertThat(branchManager.branchExists("empty_branch_name")).isFalse();
     }
 
+    @Test
+    void testCreateBranchWithIgnoreIfExists() throws Exception {
+        init(warehouse);
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.BIGINT(), 
DataTypes.STRING()},
+                        new String[] {"k", "v"});
+        FileStoreTable table =
+                createFileStoreTable(
+                        rowType,
+                        Collections.emptyList(),
+                        Collections.singletonList("k"),
+                        Collections.emptyList(),
+                        Collections.emptyMap());
+
+        StreamWriteBuilder writeBuilder = 
table.newStreamWriteBuilder().withCommitUser(commitUser);
+        write = writeBuilder.newWrite();
+        commit = writeBuilder.newCommit();
+
+        // 3 snapshots
+        writeData(rowData(1L, BinaryString.fromString("Hi")));
+        writeData(rowData(2L, BinaryString.fromString("Hello")));
+        writeData(rowData(3L, BinaryString.fromString("Paimon")));
+
+        TagManager tagManager = new TagManager(table.fileIO(), 
table.location());
+        executeSQL(String.format("CALL sys.create_tag('%s.%s', 'tag1', 1)", 
database, tableName));
+        assertThat(tagManager.tagExists("tag1")).isTrue();
+
+        BranchManager branchManager = table.branchManager();
+
+        // Create branch without ignoreIfExists
+        executeSQL(
+                String.format(
+                        "CALL sys.create_branch('%s.%s', 'branch_if_exists', 
'tag1')",
+                        database, tableName));
+        assertThat(branchManager.branchExists("branch_if_exists")).isTrue();
+
+        // Try to create the same branch again without ignoreIfExists - should 
throw exception
+        try {
+            executeSQL(
+                    String.format(
+                            "CALL sys.create_branch('%s.%s', 
'branch_if_exists', 'tag1')",
+                            database, tableName));
+            Assertions.fail("Expected exception not thrown");
+        } catch (Exception e) {
+            assertThat(e.getMessage()).contains("already exists");
+        }
+
+        // Create branch with ignoreIfExists=true - should succeed silently
+        executeSQL(
+                String.format(
+                        "CALL sys.create_branch('%s.%s', 'branch_if_exists', 
'tag1', true)",
+                        database, tableName));
+        assertThat(branchManager.branchExists("branch_if_exists")).isTrue();
+
+        // Create branch with ignoreIfExists=false - should throw exception
+        try {
+            executeSQL(
+                    String.format(
+                            "CALL sys.create_branch('%s.%s', 
'branch_if_exists', 'tag1', false)",
+                            database, tableName));
+            Assertions.fail("Expected exception not thrown");
+        } catch (Exception e) {
+            assertThat(e.getMessage()).contains("already exists");
+        }
+
+        // Test with action API
+        createAction(
+                        CreateBranchAction.class,
+                        "create_branch",
+                        "--warehouse",
+                        warehouse,
+                        "--database",
+                        database,
+                        "--table",
+                        tableName,
+                        "--branch_name",
+                        "branch_if_exists",
+                        "--tag_name",
+                        "tag1",
+                        "--ignore_if_exists",
+                        "true")
+                .run();
+        assertThat(branchManager.branchExists("branch_if_exists")).isTrue();
+
+        // Test creating new branch with ignoreIfExists=true (branch doesn't 
exist yet)
+        executeSQL(
+                String.format(
+                        "CALL sys.create_branch('%s.%s', 'new_branch', 'tag1', 
true)",
+                        database, tableName));
+        assertThat(branchManager.branchExists("new_branch")).isTrue();
+
+        // Test creating new branch with ignoreIfExists=false (branch doesn't 
exist yet)
+        executeSQL(
+                String.format(
+                        "CALL sys.create_branch('%s.%s', 'another_branch', 
'tag1', false)",
+                        database, tableName));
+        assertThat(branchManager.branchExists("another_branch")).isTrue();
+    }
+
+    @Test
+    void testCreateEmptyBranchWithIgnoreIfExists() throws Exception {
+        init(warehouse);
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.BIGINT(), 
DataTypes.STRING()},
+                        new String[] {"k", "v"});
+        FileStoreTable table =
+                createFileStoreTable(
+                        rowType,
+                        Collections.emptyList(),
+                        Collections.singletonList("k"),
+                        Collections.emptyList(),
+                        Collections.emptyMap());
+
+        StreamWriteBuilder writeBuilder = 
table.newStreamWriteBuilder().withCommitUser(commitUser);
+        write = writeBuilder.newWrite();
+        commit = writeBuilder.newCommit();
+
+        // 3 snapshots
+        writeData(rowData(1L, BinaryString.fromString("Hi")));
+        writeData(rowData(2L, BinaryString.fromString("Hello")));
+        writeData(rowData(3L, BinaryString.fromString("Paimon")));
+
+        BranchManager branchManager = table.branchManager();
+
+        // Create empty branch
+        executeSQL(
+                String.format(
+                        "CALL sys.create_branch('%s.%s', 'empty_branch')", 
database, tableName));
+        assertThat(branchManager.branchExists("empty_branch")).isTrue();
+
+        // Try to create the same empty branch again without ignoreIfExists - 
should throw exception
+        try {
+            executeSQL(
+                    String.format(
+                            "CALL sys.create_branch('%s.%s', 'empty_branch')",
+                            database, tableName));
+            Assertions.fail("Expected exception not thrown");
+        } catch (Exception e) {
+            assertThat(e.getMessage()).contains("already exists");
+        }
+
+        // Create empty branch with ignoreIfExists=true - should succeed 
silently
+        executeSQL(
+                String.format(
+                        "CALL sys.create_branch('%s.%s', 'empty_branch', 
true)",
+                        database, tableName));
+        assertThat(branchManager.branchExists("empty_branch")).isTrue();
+
+        // Create empty branch with ignoreIfExists=false - should throw 
exception
+        try {
+            executeSQL(
+                    String.format(
+                            "CALL sys.create_branch('%s.%s', 'empty_branch', 
false)",
+                            database, tableName));
+            Assertions.fail("Expected exception not thrown");
+        } catch (Exception e) {
+            assertThat(e.getMessage()).contains("already exists");
+        }
+    }
+
     @ParameterizedTest
     @ValueSource(booleans = {false, true})
     void testFastForward(boolean forceStartFlinkJob) throws Exception {
diff --git 
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
index c08cff88e1..99b03327ae 100644
--- 
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
+++ 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
@@ -64,6 +64,14 @@ public class MultipleParameterToolAdapter {
         return value;
     }
 
+    public Boolean getBoolean(String key, Boolean defaultValue) {
+        String value = get(key);
+        if (value == null) {
+            return defaultValue;
+        }
+        return Boolean.parseBoolean(value);
+    }
+
     public MultipleParameterToolAdapter mergeWith(MultipleParameterToolAdapter 
other) {
         return new 
MultipleParameterToolAdapter(this.params.mergeWith(other.params));
     }
diff --git 
a/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
 
b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
index b545748195..d140c15e96 100644
--- 
a/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
+++ 
b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
@@ -64,6 +64,14 @@ public class MultipleParameterToolAdapter {
         return value;
     }
 
+    public Boolean getBoolean(String key, Boolean defaultValue) {
+        String value = get(key);
+        if (value == null) {
+            return defaultValue;
+        }
+        return Boolean.parseBoolean(value);
+    }
+
     public MultipleParameterToolAdapter mergeWith(MultipleParameterToolAdapter 
other) {
         return new 
MultipleParameterToolAdapter(this.params.mergeWith(other.params));
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateBranchProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateBranchProcedure.java
index 2a0e0be02b..2953d596e1 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateBranchProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateBranchProcedure.java
@@ -35,7 +35,8 @@ public class CreateBranchProcedure extends BaseProcedure {
             new ProcedureParameter[] {
                 ProcedureParameter.required("table", StringType),
                 ProcedureParameter.required("branch", StringType),
-                ProcedureParameter.optional("tag", StringType)
+                ProcedureParameter.optional("tag", StringType),
+                ProcedureParameter.optional("ignoreIfExists", StringType)
             };
 
     private static final StructType OUTPUT_TYPE =
@@ -63,14 +64,15 @@ public class CreateBranchProcedure extends BaseProcedure {
         Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
         String branch = args.getString(1);
         String tag = args.isNullAt(2) ? null : args.getString(2);
+        boolean ignoreIfExists = !args.isNullAt(3) && 
Boolean.parseBoolean(args.getString(3));
 
         return modifyPaimonTable(
                 tableIdent,
                 table -> {
                     if (tag != null) {
-                        table.createBranch(branch, tag);
+                        table.createBranch(branch, tag, ignoreIfExists);
                     } else {
-                        table.createBranch(branch);
+                        table.createBranch(branch, ignoreIfExists);
                     }
 
                     InternalRow outputRow = newInternalRow(true);


Reply via email to