This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 85b8dee500 [core] Add IF NOT EXISTS support for create_branch
operation (#7353)
85b8dee500 is described below
commit 85b8dee500ba25d03bb0ff7416387bc88cf4e58f
Author: Rhett CfZhuang <[email protected]>
AuthorDate: Sat Mar 7 14:20:38 2026 +0800
[core] Add IF NOT EXISTS support for create_branch operation (#7353)
---
.../java/org/apache/paimon/catalog/Catalog.java | 22 +++
.../org/apache/paimon/catalog/DelegateCatalog.java | 7 +
.../paimon/privilege/PrivilegedFileStoreTable.java | 12 ++
.../paimon/table/AbstractFileStoreTable.java | 10 ++
.../paimon/table/DelegatedFileStoreTable.java | 10 ++
.../java/org/apache/paimon/table/FormatTable.java | 10 ++
.../org/apache/paimon/table/ReadonlyTable.java | 16 ++
.../main/java/org/apache/paimon/table/Table.java | 19 +++
.../org/apache/paimon/utils/BranchManager.java | 19 +++
.../apache/paimon/utils/CatalogBranchManager.java | 15 +-
.../paimon/utils/FileSystemBranchManager.java | 16 ++
.../apache/paimon/table/SimpleTableTestBase.java | 50 ++++++
.../flink/procedure/CreateBranchProcedure.java | 21 ++-
.../paimon/flink/action/CreateBranchAction.java | 9 +-
.../flink/action/CreateBranchActionFactory.java | 7 +-
.../flink/procedure/CreateBranchProcedure.java | 62 +++++++-
.../paimon/flink/action/BranchActionITCase.java | 169 ++++++++++++++++++++-
.../flink/action/MultipleParameterToolAdapter.java | 8 +
.../flink/action/MultipleParameterToolAdapter.java | 8 +
.../spark/procedure/CreateBranchProcedure.java | 8 +-
20 files changed, 480 insertions(+), 18 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 5dfbaf2978..ef4a943e69 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -810,6 +810,28 @@ public interface Catalog extends AutoCloseable {
void createBranch(Identifier identifier, String branch, @Nullable String
fromTag)
throws TableNotExistException, BranchAlreadyExistException,
TagNotExistException;
+ /**
+ * Create a branch for this table with option to ignore if the branch
already exists.
+ *
+ * @param identifier path of the table, cannot be system or branch name.
+ * @param branch the branch name
+ * @param fromTag from the tag
+ * @param ignoreIfExists if true, do nothing when branch already exists
+ * @throws TableNotExistException if the table in identifier doesn't exist
+ * @throws BranchAlreadyExistException if the branch already exists and
ignoreIfExists is false
+ * @throws TagNotExistException if the tag doesn't exist
+ * @throws UnsupportedOperationException if the catalog does not {@link
+ * #supportsVersionManagement()}
+ */
+ default void createBranch(
+ Identifier identifier, String branch, @Nullable String fromTag,
boolean ignoreIfExists)
+ throws TableNotExistException, BranchAlreadyExistException,
TagNotExistException {
+ if (ignoreIfExists && listBranches(identifier).contains(branch)) {
+ return;
+ }
+ createBranch(identifier, branch, fromTag);
+ }
+
/**
* Drop the branch for this table.
*
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index 3f1ea209a9..81558ac68a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -228,6 +228,13 @@ public abstract class DelegateCatalog implements Catalog {
wrapped.createBranch(identifier, branch, fromTag);
}
+ @Override
+ public void createBranch(
+ Identifier identifier, String branch, @Nullable String fromTag,
boolean ignoreIfExists)
+ throws TableNotExistException, BranchAlreadyExistException,
TagNotExistException {
+ wrapped.createBranch(identifier, branch, fromTag, ignoreIfExists);
+ }
+
@Override
public void dropBranch(Identifier identifier, String branch) throws
BranchNotExistException {
wrapped.dropBranch(identifier, branch);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
index ad6e998d45..a2c3e62c36 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
@@ -180,6 +180,18 @@ public class PrivilegedFileStoreTable extends
DelegatedFileStoreTable {
wrapped.createBranch(branchName, tagName);
}
+ @Override
+ public void createBranch(String branchName, boolean ignoreIfExists) {
+ privilegeChecker.assertCanInsert(identifier);
+ wrapped.createBranch(branchName, ignoreIfExists);
+ }
+
+ @Override
+ public void createBranch(String branchName, String tagName, boolean
ignoreIfExists) {
+ privilegeChecker.assertCanInsert(identifier);
+ wrapped.createBranch(branchName, tagName, ignoreIfExists);
+ }
+
@Override
public void deleteBranch(String branchName) {
privilegeChecker.assertCanInsert(identifier);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index b02b0ea958..014df441ac 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -673,6 +673,16 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
branchManager().createBranch(branchName, tagName);
}
+ @Override
+ public void createBranch(String branchName, boolean ignoreIfExists) {
+ branchManager().createBranch(branchName, ignoreIfExists);
+ }
+
+ @Override
+ public void createBranch(String branchName, String tagName, boolean
ignoreIfExists) {
+ branchManager().createBranch(branchName, tagName, ignoreIfExists);
+ }
+
@Override
public void deleteBranch(String branchName) {
String fallbackBranch =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
index 3a0d4ecc1c..b92c4c1630 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
@@ -265,6 +265,16 @@ public abstract class DelegatedFileStoreTable implements
FileStoreTable {
wrapped.createBranch(branchName, tagName);
}
+ @Override
+ public void createBranch(String branchName, boolean ignoreIfExists) {
+ wrapped.createBranch(branchName, ignoreIfExists);
+ }
+
+ @Override
+ public void createBranch(String branchName, String tagName, boolean
ignoreIfExists) {
+ wrapped.createBranch(branchName, tagName, ignoreIfExists);
+ }
+
@Override
public void deleteBranch(String branchName) {
wrapped.deleteBranch(branchName);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
index af6dd9ca26..f1fbc74c12 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
@@ -384,6 +384,16 @@ public interface FormatTable extends Table {
throw new UnsupportedOperationException();
}
+ @Override
+ default void createBranch(String branchName, boolean ignoreIfExists) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default void createBranch(String branchName, String tagName, boolean
ignoreIfExists) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
default void deleteBranch(String branchName) {
throw new UnsupportedOperationException();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
index cb6e3d3fb3..c5b8deeca9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
@@ -233,6 +233,22 @@ public interface ReadonlyTable extends InnerTable {
this.getClass().getSimpleName()));
}
+ @Override
+ default void createBranch(String branchName, boolean ignoreIfExists) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Readonly Table %s does not support create branch.",
+ this.getClass().getSimpleName()));
+ }
+
+ @Override
+ default void createBranch(String branchName, String tagName, boolean
ignoreIfExists) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Readonly Table %s does not support create branch.",
+ this.getClass().getSimpleName()));
+ }
+
@Override
default void deleteBranch(String branchName) {
throw new UnsupportedOperationException(
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
index 9c1f07f3aa..8d49d7206e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
@@ -165,6 +165,25 @@ public interface Table extends Serializable {
@Experimental
void createBranch(String branchName, String tagName);
+ /**
+ * Create an empty branch with option to ignore if the branch already
exists.
+ *
+ * @param branchName the branch name
+ * @param ignoreIfExists if true, do nothing when branch already exists
+ */
+ @Experimental
+ void createBranch(String branchName, boolean ignoreIfExists);
+
+ /**
+ * Create a branch from given tag with option to ignore if the branch
already exists.
+ *
+ * @param branchName the branch name
+ * @param tagName the tag name to create branch from
+ * @param ignoreIfExists if true, do nothing when branch already exists
+ */
+ @Experimental
+ void createBranch(String branchName, String tagName, boolean
ignoreIfExists);
+
/** Delete a branch by branchName. */
@Experimental
void deleteBranch(String branchName);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
index fc3defa8f3..e055724efe 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
@@ -36,6 +36,25 @@ public interface BranchManager {
void createBranch(String branchName, @Nullable String tagName);
+ /**
+ * Create a branch with option to ignore if the branch already exists.
+ *
+ * @param branchName the branch name
+ * @param ignoreIfExists if true, do nothing when branch already exists;
if false, throw
+ * exception
+ */
+ void createBranch(String branchName, boolean ignoreIfExists);
+
+ /**
+ * Create a branch from tag with option to ignore if the branch already
exists.
+ *
+ * @param branchName the branch name
+ * @param tagName the tag name to create branch from
+ * @param ignoreIfExists if true, do nothing when branch already exists;
if false, throw
+ * exception
+ */
+ void createBranch(String branchName, @Nullable String tagName, boolean
ignoreIfExists);
+
void dropBranch(String branchName);
void fastForward(String branchName);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java
index a395092456..8723135bd0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java
@@ -68,14 +68,27 @@ public class CatalogBranchManager implements BranchManager {
@Override
public void createBranch(String branchName) {
- executePost(catalog -> catalog.createBranch(identifier, branchName,
null));
+ createBranch(branchName, false);
}
@Override
public void createBranch(String branchName, @Nullable String tagName) {
+ createBranch(branchName, tagName, false);
+ }
+
+ @Override
+ public void createBranch(String branchName, boolean ignoreIfExists) {
+ createBranch(branchName, null, ignoreIfExists);
+ }
+
+ @Override
+ public void createBranch(String branchName, @Nullable String tagName,
boolean ignoreIfExists) {
executePost(
catalog -> {
BranchManager.validateBranch(branchName);
+ if (ignoreIfExists &&
catalog.listBranches(identifier).contains(branchName)) {
+ return;
+ }
catalog.createBranch(identifier, branchName, tagName);
});
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java
index 7a1bdbef29..ef3a04ec99 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java
@@ -73,6 +73,14 @@ public class FileSystemBranchManager implements
BranchManager {
@Override
public void createBranch(String branchName) {
+ createBranch(branchName, false);
+ }
+
+ @Override
+ public void createBranch(String branchName, boolean ignoreIfExists) {
+ if (ignoreIfExists && branchExists(branchName)) {
+ return;
+ }
validateBranch(branchName);
try {
TableSchema latestSchema = schemaManager.latest().get();
@@ -88,6 +96,14 @@ public class FileSystemBranchManager implements
BranchManager {
@Override
public void createBranch(String branchName, String tagName) {
+ createBranch(branchName, tagName, false);
+ }
+
+ @Override
+ public void createBranch(String branchName, String tagName, boolean
ignoreIfExists) {
+ if (ignoreIfExists && branchExists(branchName)) {
+ return;
+ }
validateBranch(branchName);
Snapshot snapshot = tagManager.getOrThrow(tagName).trimToSnapshot();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
index a310404f53..1b58b012cd 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
@@ -1134,6 +1134,56 @@ public abstract class SimpleTableTestBase {
"Branch name cannot be pure numeric string but
is '10'."));
}
+ @Test
+ public void testCreateBranchWithIgnoreIfExists() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser)) {
+ write.write(rowData(1, 10, 100L));
+ commit.commit(0, write.prepareCommit(false, 1));
+ write.write(rowData(2, 20, 200L));
+ commit.commit(1, write.prepareCommit(false, 2));
+ }
+
+ table.createTag("test-tag", 2);
+ BranchManager branchManager = table.branchManager();
+
+ // Test create branch with ignoreIfExists=true (branch doesn't exist)
+ table.createBranch("new-branch", "test-tag", true);
+ assertThat(branchManager.branchExists("new-branch")).isTrue();
+
+ // Test create branch with ignoreIfExists=false (branch doesn't exist)
+ table.createBranch("another-branch", "test-tag", false);
+ assertThat(branchManager.branchExists("another-branch")).isTrue();
+
+ // Test create existing branch with ignoreIfExists=true (should
succeed silently)
+ table.createBranch("new-branch", "test-tag", true);
+ assertThat(branchManager.branchExists("new-branch")).isTrue();
+
+ // Test create existing branch with ignoreIfExists=false (should throw
exception)
+ assertThatThrownBy(() -> table.createBranch("new-branch", "test-tag",
false))
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "Branch name 'new-branch' already exists."));
+
+ // Test create empty branch with ignoreIfExists
+ table.createBranch("empty-branch", true);
+ assertThat(branchManager.branchExists("empty-branch")).isTrue();
+
+ // Test create existing empty branch with ignoreIfExists=true
+ table.createBranch("empty-branch", true);
+ assertThat(branchManager.branchExists("empty-branch")).isTrue();
+
+ // Test create existing empty branch with ignoreIfExists=false
+ assertThatThrownBy(() -> table.createBranch("empty-branch", false))
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "Branch name 'empty-branch' already exists."));
+ }
+
@Test
public void testDeleteBranch() throws Exception {
FileStoreTable table = createFileStoreTable();
diff --git
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
index 0bf2292b14..a7a8fb9952 100644
---
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
+++
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
@@ -45,21 +45,32 @@ public class CreateBranchProcedure extends ProcedureBase {
public String[] call(
ProcedureContext procedureContext, String tableId, String
branchName, String tagName)
throws Catalog.TableNotExistException {
- return innerCall(tableId, branchName, tagName);
+ return innerCall(tableId, branchName, tagName, false);
}
public String[] call(ProcedureContext procedureContext, String tableId,
String branchName)
throws Catalog.TableNotExistException {
- return innerCall(tableId, branchName, null);
+ return innerCall(tableId, branchName, null, false);
}
- private String[] innerCall(String tableId, String branchName, String
tagName)
+ public String[] call(
+ ProcedureContext procedureContext,
+ String tableId,
+ String branchName,
+ String tagName,
+ Boolean ignoreIfExists)
+ throws Catalog.TableNotExistException {
+ return innerCall(tableId, branchName, tagName, ignoreIfExists != null
&& ignoreIfExists);
+ }
+
+ private String[] innerCall(
+ String tableId, String branchName, String tagName, boolean
ignoreIfExists)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
if (!StringUtils.isBlank(tagName)) {
- table.createBranch(branchName, tagName);
+ table.createBranch(branchName, tagName, ignoreIfExists);
} else {
- table.createBranch(branchName);
+ table.createBranch(branchName, ignoreIfExists);
}
return new String[] {"Success"};
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchAction.java
index bfa765dd75..0d681cae73 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchAction.java
@@ -26,24 +26,27 @@ import java.util.Map;
public class CreateBranchAction extends TableActionBase implements LocalAction
{
private final String branchName;
private final String tagName;
+ private final boolean ignoreIfExists;
public CreateBranchAction(
String databaseName,
String tableName,
Map<String, String> catalogConfig,
String branchName,
- String tagName) {
+ String tagName,
+ boolean ignoreIfExists) {
super(databaseName, tableName, catalogConfig);
this.branchName = branchName;
this.tagName = tagName;
+ this.ignoreIfExists = ignoreIfExists;
}
@Override
public void executeLocally() {
if (!StringUtils.isBlank(tagName)) {
- table.createBranch(branchName, tagName);
+ table.createBranch(branchName, tagName, ignoreIfExists);
} else {
- table.createBranch(branchName);
+ table.createBranch(branchName, ignoreIfExists);
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchActionFactory.java
index 69dc1b3792..9ff1ef7896 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchActionFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchActionFactory.java
@@ -27,6 +27,7 @@ public class CreateBranchActionFactory implements
ActionFactory {
private static final String TAG_NAME = "tag_name";
private static final String BRANCH_NAME = "branch_name";
+ private static final String IGNORE_IF_EXISTS = "ignore_if_exists";
@Override
public String identifier() {
@@ -41,7 +42,8 @@ public class CreateBranchActionFactory implements
ActionFactory {
params.getRequired(TABLE),
catalogConfigMap(params),
params.getRequired(BRANCH_NAME),
- params.get(TAG_NAME));
+ params.get(TAG_NAME),
+ params.getBoolean(IGNORE_IF_EXISTS, false));
return Optional.of(action);
}
@@ -57,7 +59,8 @@ public class CreateBranchActionFactory implements
ActionFactory {
+ "--database <database_name> \\\n"
+ "--table <table_name> \\\n"
+ "--branch_name <branch_name> \\\n"
- + "[--tag_name <tag_name>]");
+ + "[--tag_name <tag_name>] \\\n"
+ + "[--ignore_if_exists <true/false>]");
System.out.println();
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
index 897f2d6bda..90cb5f8cd9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
@@ -33,7 +33,10 @@ import org.apache.flink.table.procedure.ProcedureContext;
* Create branch procedure for given tag. Usage:
*
* <pre><code>
+ * CALL sys.create_branch('tableId', 'branchName')
* CALL sys.create_branch('tableId', 'branchName', 'tagName')
+ * CALL sys.create_branch('tableId', 'branchName', true)
+ * CALL sys.create_branch('tableId', 'branchName', 'tagName', true)
* </code></pre>
*/
public class CreateBranchProcedure extends ProcedureBase {
@@ -45,6 +48,44 @@ public class CreateBranchProcedure extends ProcedureBase {
return IDENTIFIER;
}
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+ @ArgumentHint(name = "branch", type = @DataTypeHint("STRING")),
+ @ArgumentHint(name = "tag", type = @DataTypeHint("STRING"),
isOptional = true),
+ @ArgumentHint(
+ name = "ignoreIfExists",
+ type = @DataTypeHint("BOOLEAN"),
+ isOptional = true)
+ })
+ public String[] call(
+ ProcedureContext procedureContext,
+ String tableId,
+ String branchName,
+ String tagName,
+ Boolean ignoreIfExists)
+ throws Catalog.TableNotExistException {
+ return innerCall(tableId, branchName, tagName, ignoreIfExists);
+ }
+
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+ @ArgumentHint(name = "branch", type = @DataTypeHint("STRING")),
+ @ArgumentHint(
+ name = "ignoreIfExists",
+ type = @DataTypeHint("BOOLEAN"),
+ isOptional = true)
+ })
+ public String[] call(
+ ProcedureContext procedureContext,
+ String tableId,
+ String branchName,
+ Boolean ignoreIfExists)
+ throws Catalog.TableNotExistException {
+ return innerCall(tableId, branchName, null, ignoreIfExists);
+ }
+
@ProcedureHint(
argument = {
@ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
@@ -54,11 +95,28 @@ public class CreateBranchProcedure extends ProcedureBase {
public String[] call(
ProcedureContext procedureContext, String tableId, String
branchName, String tagName)
throws Catalog.TableNotExistException {
+ return innerCall(tableId, branchName, tagName, false);
+ }
+
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+ @ArgumentHint(name = "branch", type = @DataTypeHint("STRING"))
+ })
+ public String[] call(ProcedureContext procedureContext, String tableId,
String branchName)
+ throws Catalog.TableNotExistException {
+ return innerCall(tableId, branchName, null, false);
+ }
+
+ private String[] innerCall(
+ String tableId, String branchName, String tagName, Boolean
ignoreIfExists)
+ throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
+ boolean ignore = ignoreIfExists != null && ignoreIfExists;
if (!StringUtils.isBlank(tagName)) {
- table.createBranch(branchName, tagName);
+ table.createBranch(branchName, tagName, ignore);
} else {
- table.createBranch(branchName);
+ table.createBranch(branchName, ignore);
}
return new String[] {"Success"};
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
index e2387a8c38..c0ece12168 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
@@ -30,6 +30,7 @@ import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -87,7 +88,7 @@ public class BranchActionITCase extends ActionITCaseBase {
executeSQL(
String.format(
- "CALL sys.create_branch(`table` => '%s.%s', branch =>
'branch_name_named_argument', tag => 'tag2')",
+ "CALL sys.create_branch('%s.%s',
'branch_name_named_argument', 'tag2')",
database, tableName));
assertThat(branchManager.branchExists("branch_name_named_argument")).isTrue();
@@ -168,7 +169,7 @@ public class BranchActionITCase extends ActionITCaseBase {
executeSQL(
String.format(
- "CALL sys.create_branch(`table` => '%s.%s', branch =>
'empty_branch_named_argument')",
+ "CALL sys.create_branch('%s.%s',
'empty_branch_named_argument')",
database, tableName));
assertThat(branchManager.branchExists("empty_branch_named_argument")).isTrue();
@@ -213,6 +214,170 @@ public class BranchActionITCase extends ActionITCaseBase {
assertThat(branchManager.branchExists("empty_branch_name")).isFalse();
}
+ @Test
+ void testCreateBranchWithIgnoreIfExists() throws Exception {
+ init(warehouse);
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.BIGINT(),
DataTypes.STRING()},
+ new String[] {"k", "v"});
+ FileStoreTable table =
+ createFileStoreTable(
+ rowType,
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ Collections.emptyList(),
+ Collections.emptyMap());
+
+ StreamWriteBuilder writeBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = writeBuilder.newWrite();
+ commit = writeBuilder.newCommit();
+
+ // 3 snapshots
+ writeData(rowData(1L, BinaryString.fromString("Hi")));
+ writeData(rowData(2L, BinaryString.fromString("Hello")));
+ writeData(rowData(3L, BinaryString.fromString("Paimon")));
+
+ TagManager tagManager = new TagManager(table.fileIO(),
table.location());
+ executeSQL(String.format("CALL sys.create_tag('%s.%s', 'tag1', 1)",
database, tableName));
+ assertThat(tagManager.tagExists("tag1")).isTrue();
+
+ BranchManager branchManager = table.branchManager();
+
+ // Create branch without ignoreIfExists
+ executeSQL(
+ String.format(
+ "CALL sys.create_branch('%s.%s', 'branch_if_exists',
'tag1')",
+ database, tableName));
+ assertThat(branchManager.branchExists("branch_if_exists")).isTrue();
+
+ // Try to create the same branch again without ignoreIfExists - should
throw exception
+ try {
+ executeSQL(
+ String.format(
+ "CALL sys.create_branch('%s.%s',
'branch_if_exists', 'tag1')",
+ database, tableName));
+ Assertions.fail("Expected exception not thrown");
+ } catch (Exception e) {
+ assertThat(e.getMessage()).contains("already exists");
+ }
+
+ // Create branch with ignoreIfExists=true - should succeed silently
+ executeSQL(
+ String.format(
+ "CALL sys.create_branch('%s.%s', 'branch_if_exists',
'tag1', true)",
+ database, tableName));
+ assertThat(branchManager.branchExists("branch_if_exists")).isTrue();
+
+ // Create branch with ignoreIfExists=false - should throw exception
+ try {
+ executeSQL(
+ String.format(
+ "CALL sys.create_branch('%s.%s',
'branch_if_exists', 'tag1', false)",
+ database, tableName));
+ Assertions.fail("Expected exception not thrown");
+ } catch (Exception e) {
+ assertThat(e.getMessage()).contains("already exists");
+ }
+
+ // Test with action API
+ createAction(
+ CreateBranchAction.class,
+ "create_branch",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ tableName,
+ "--branch_name",
+ "branch_if_exists",
+ "--tag_name",
+ "tag1",
+ "--ignore_if_exists",
+ "true")
+ .run();
+ assertThat(branchManager.branchExists("branch_if_exists")).isTrue();
+
+ // Test creating new branch with ignoreIfExists=true (branch doesn't
exist yet)
+ executeSQL(
+ String.format(
+ "CALL sys.create_branch('%s.%s', 'new_branch', 'tag1',
true)",
+ database, tableName));
+ assertThat(branchManager.branchExists("new_branch")).isTrue();
+
+ // Test creating new branch with ignoreIfExists=false (branch doesn't
exist yet)
+ executeSQL(
+ String.format(
+ "CALL sys.create_branch('%s.%s', 'another_branch',
'tag1', false)",
+ database, tableName));
+ assertThat(branchManager.branchExists("another_branch")).isTrue();
+ }
+
+ @Test
+ void testCreateEmptyBranchWithIgnoreIfExists() throws Exception {
+ init(warehouse);
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.BIGINT(),
DataTypes.STRING()},
+ new String[] {"k", "v"});
+ FileStoreTable table =
+ createFileStoreTable(
+ rowType,
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ Collections.emptyList(),
+ Collections.emptyMap());
+
+ StreamWriteBuilder writeBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = writeBuilder.newWrite();
+ commit = writeBuilder.newCommit();
+
+ // 3 snapshots
+ writeData(rowData(1L, BinaryString.fromString("Hi")));
+ writeData(rowData(2L, BinaryString.fromString("Hello")));
+ writeData(rowData(3L, BinaryString.fromString("Paimon")));
+
+ BranchManager branchManager = table.branchManager();
+
+ // Create empty branch
+ executeSQL(
+ String.format(
+ "CALL sys.create_branch('%s.%s', 'empty_branch')",
database, tableName));
+ assertThat(branchManager.branchExists("empty_branch")).isTrue();
+
+ // Try to create the same empty branch again without ignoreIfExists -
should throw exception
+ try {
+ executeSQL(
+ String.format(
+ "CALL sys.create_branch('%s.%s', 'empty_branch')",
+ database, tableName));
+ Assertions.fail("Expected exception not thrown");
+ } catch (Exception e) {
+ assertThat(e.getMessage()).contains("already exists");
+ }
+
+ // Create empty branch with ignoreIfExists=true - should succeed
silently
+ executeSQL(
+ String.format(
+ "CALL sys.create_branch('%s.%s', 'empty_branch',
true)",
+ database, tableName));
+ assertThat(branchManager.branchExists("empty_branch")).isTrue();
+
+ // Create empty branch with ignoreIfExists=false - should throw
exception
+ try {
+ executeSQL(
+ String.format(
+ "CALL sys.create_branch('%s.%s', 'empty_branch',
false)",
+ database, tableName));
+ Assertions.fail("Expected exception not thrown");
+ } catch (Exception e) {
+ assertThat(e.getMessage()).contains("already exists");
+ }
+ }
+
@ParameterizedTest
@ValueSource(booleans = {false, true})
void testFastForward(boolean forceStartFlinkJob) throws Exception {
diff --git
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
index c08cff88e1..99b03327ae 100644
---
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
+++
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
@@ -64,6 +64,14 @@ public class MultipleParameterToolAdapter {
return value;
}
+ public Boolean getBoolean(String key, Boolean defaultValue) {
+ String value = get(key);
+ if (value == null) {
+ return defaultValue;
+ }
+ return Boolean.parseBoolean(value);
+ }
+
public MultipleParameterToolAdapter mergeWith(MultipleParameterToolAdapter
other) {
return new
MultipleParameterToolAdapter(this.params.mergeWith(other.params));
}
diff --git
a/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
index b545748195..d140c15e96 100644
---
a/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
+++
b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
@@ -64,6 +64,14 @@ public class MultipleParameterToolAdapter {
return value;
}
+ public Boolean getBoolean(String key, Boolean defaultValue) {
+ String value = get(key);
+ if (value == null) {
+ return defaultValue;
+ }
+ return Boolean.parseBoolean(value);
+ }
+
public MultipleParameterToolAdapter mergeWith(MultipleParameterToolAdapter
other) {
return new
MultipleParameterToolAdapter(this.params.mergeWith(other.params));
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateBranchProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateBranchProcedure.java
index 2a0e0be02b..2953d596e1 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateBranchProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateBranchProcedure.java
@@ -35,7 +35,8 @@ public class CreateBranchProcedure extends BaseProcedure {
new ProcedureParameter[] {
ProcedureParameter.required("table", StringType),
ProcedureParameter.required("branch", StringType),
- ProcedureParameter.optional("tag", StringType)
+ ProcedureParameter.optional("tag", StringType),
+ ProcedureParameter.optional("ignoreIfExists", StringType)
};
private static final StructType OUTPUT_TYPE =
@@ -63,14 +64,15 @@ public class CreateBranchProcedure extends BaseProcedure {
Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
String branch = args.getString(1);
String tag = args.isNullAt(2) ? null : args.getString(2);
+ boolean ignoreIfExists = !args.isNullAt(3) &&
Boolean.parseBoolean(args.getString(3));
return modifyPaimonTable(
tableIdent,
table -> {
if (tag != null) {
- table.createBranch(branch, tag);
+ table.createBranch(branch, tag, ignoreIfExists);
} else {
- table.createBranch(branch);
+ table.createBranch(branch, ignoreIfExists);
}
InternalRow outputRow = newInternalRow(true);