This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 312ce5f35 [core] Remove the feature that creates branches from 
snapshots. (#3934)
312ce5f35 is described below

commit 312ce5f35aebf76a0056610ef962cdc81b60349d
Author: HunterXHunter <[email protected]>
AuthorDate: Sun Aug 11 22:01:23 2024 +0800

    [core] Remove the feature that creates branches from snapshots. (#3934)
---
 docs/content/flink/procedures.md                   |  6 +-
 docs/content/maintenance/manage-branches.md        |  4 -
 docs/content/spark/procedures.md                   |  2 -
 .../paimon/privilege/PrivilegedFileStoreTable.java |  6 --
 .../paimon/table/AbstractFileStoreTable.java       |  5 --
 .../paimon/table/DelegatedFileStoreTable.java      |  5 --
 .../org/apache/paimon/table/ReadonlyTable.java     |  8 --
 .../main/java/org/apache/paimon/table/Table.java   |  4 -
 .../org/apache/paimon/utils/BranchManager.java     | 34 --------
 .../paimon/flink/action/CreateBranchAction.java    |  8 +-
 .../flink/action/CreateBranchActionFactory.java    | 11 +--
 .../flink/procedure/CreateBranchProcedure.java     | 14 +---
 .../org/apache/paimon/flink/BranchSqlITCase.java   | 57 +++++---------
 .../paimon/flink/action/BranchActionITCase.java    | 92 ----------------------
 .../paimon/flink/action/ConsumerActionITCase.java  |  3 +-
 .../spark/procedure/CreateBranchProcedure.java     |  7 +-
 .../CreateAndDeleteBranchProcedureTest.scala       | 29 -------
 17 files changed, 29 insertions(+), 266 deletions(-)

diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 3fae9b405..6209791b8 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -308,22 +308,18 @@ All available procedures are listed below.
    <tr>
       <td>create_branch</td>
       <td>
-         -- based on the specified snapshot <br/>
-         CALL [catalog.]sys.create_branch('identifier', 'branchName', 
snapshotId) <br/>
          -- based on the specified tag <br/>
          CALL [catalog.]sys.create_branch('identifier', 'branchName', 
'tagName')
          -- create empty branch <br/>
          CALL [catalog.]sys.create_branch('identifier', 'branchName')
       </td>
       <td>
-         To create a branch based on given snapshot / tag, or just create 
empty branch. Arguments:
+         To create a branch based on given tag, or just create empty branch. 
Arguments:
             <li>identifier: the target table identifier. Cannot be empty.</li>
             <li>branchName: name of the new branch.</li>
-            <li>snapshotId (Long): id of the snapshot which the new branch is 
based on.</li>
             <li>tagName: name of the tag which the new branch is based on.</li>
       </td>
       <td>
-         CALL sys.create_branch('default.T', 'branch1', 10)<br/><br/>
          CALL sys.create_branch('default.T', 'branch1', 'tag1')<br/><br/>
          CALL sys.create_branch('default.T', 'branch1')<br/><br/>
       </td>
diff --git a/docs/content/maintenance/manage-branches.md 
b/docs/content/maintenance/manage-branches.md
index 22ca9b850..628caf1b5 100644
--- a/docs/content/maintenance/manage-branches.md
+++ b/docs/content/maintenance/manage-branches.md
@@ -46,9 +46,6 @@ Run the following sql:
 -- create branch named 'branch1' from tag 'tag1'
 CALL sys.create_branch('default.T', 'branch1', 'tag1');
 
--- create branch named 'branch1' from snapshot 1
-CALL sys.create_branch('default.T', 'branch1', 1);
-
 -- create empty branch named 'branch1'
 CALL sys.create_branch('default.T', 'branch1');
 ```
@@ -67,7 +64,6 @@ Run the following command:
     --table <table-name> \
     --branch_name <branch-name> \
     [--tag_name <tag-name>] \
-    [--snapshot <snapshot_id>] \
     [--catalog_conf <paimon-catalog-conf> [--catalog_conf 
<paimon-catalog-conf> ...]]
 ```
 {{< /tab >}}
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index c3b6292c7..9a53a79ee 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -171,12 +171,10 @@ This section introduce all available spark procedures 
about paimon.
             <li>table: the target table identifier. Cannot be empty.</li>
             <li>branch: name of the branch to be merged.</li>
             <li>tag: name of the new tag. Cannot be empty.</li>
-            <li>snapshot(Long):  id of the snapshot which the new tag is based 
on.</li>
       </td>
       <td>
           CALL sys.create_branch(table => 'test_db.T', branch => 
'test_branch')<br/><br/>
           CALL sys.create_branch(table => 'test_db.T', branch => 
'test_branch', tag => 'my_tag')<br/><br/>
-          CALL sys.create_branch(table => 'test_db.T', branch => 
'test_branch', snapshot => 10)
       </td>
     </tr>
     <tr>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
index d590eb370..f412d4e5e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
@@ -145,12 +145,6 @@ public class PrivilegedFileStoreTable extends 
DelegatedFileStoreTable {
         wrapped.createBranch(branchName);
     }
 
-    @Override
-    public void createBranch(String branchName, long snapshotId) {
-        privilegeChecker.assertCanInsert(identifier);
-        wrapped.createBranch(branchName, snapshotId);
-    }
-
     @Override
     public void createBranch(String branchName, String tagName) {
         privilegeChecker.assertCanInsert(identifier);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 6e3c79d4d..04f5f6dd6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -569,11 +569,6 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
         branchManager().createBranch(branchName);
     }
 
-    @Override
-    public void createBranch(String branchName, long snapshotId) {
-        branchManager().createBranch(branchName, snapshotId);
-    }
-
     @Override
     public void createBranch(String branchName, String tagName) {
         branchManager().createBranch(branchName, tagName);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
index 243ffb754..1ff64f2c1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
@@ -168,11 +168,6 @@ public abstract class DelegatedFileStoreTable implements 
FileStoreTable {
         wrapped.createBranch(branchName);
     }
 
-    @Override
-    public void createBranch(String branchName, long snapshotId) {
-        wrapped.createBranch(branchName, snapshotId);
-    }
-
     @Override
     public void createBranch(String branchName, String tagName) {
         wrapped.createBranch(branchName, tagName);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
index b9eeba398..4854f983d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
@@ -176,14 +176,6 @@ public interface ReadonlyTable extends InnerTable {
                         this.getClass().getSimpleName()));
     }
 
-    @Override
-    default void createBranch(String branchName, long snapshotId) {
-        throw new UnsupportedOperationException(
-                String.format(
-                        "Readonly Table %s does not support createBranch with 
snapshotId.",
-                        this.getClass().getSimpleName()));
-    }
-
     @Override
     default void createBranch(String branchName, String tagName) {
         throw new UnsupportedOperationException(
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java 
b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
index 62207f882..55cf25aea 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
@@ -116,10 +116,6 @@ public interface Table extends Serializable {
     @Experimental
     void createBranch(String branchName);
 
-    /** Create a branch from given snapshot. */
-    @Experimental
-    void createBranch(String branchName, long snapshotId);
-
     /** Create a branch from given tag. */
     @Experimental
     void createBranch(String branchName, String tagName);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
index af598587c..0e905cd68 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
@@ -125,40 +125,6 @@ public class BranchManager {
         }
     }
 
-    public void createBranch(String branchName, long snapshotId) {
-        checkArgument(
-                !isMainBranch(branchName),
-                String.format(
-                        "Branch name '%s' is the default branch and cannot be 
used.",
-                        DEFAULT_MAIN_BRANCH));
-        checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is 
blank.", branchName);
-        checkArgument(!branchExists(branchName), "Branch name '%s' already 
exists.", branchName);
-        checkArgument(
-                !branchName.chars().allMatch(Character::isDigit),
-                "Branch name cannot be pure numeric string but is '%s'.",
-                branchName);
-
-        Snapshot snapshot = snapshotManager.snapshot(snapshotId);
-
-        try {
-            // Copy the corresponding snapshot and schema files into the 
branch directory
-            fileIO.copyFile(
-                    snapshotManager.snapshotPath(snapshotId),
-                    
snapshotManager.copyWithBranch(branchName).snapshotPath(snapshot.id()),
-                    true);
-            fileIO.copyFile(
-                    schemaManager.toSchemaPath(snapshot.schemaId()),
-                    
schemaManager.copyWithBranch(branchName).toSchemaPath(snapshot.schemaId()),
-                    true);
-        } catch (IOException e) {
-            throw new RuntimeException(
-                    String.format(
-                            "Exception occurs when create branch '%s' 
(directory in %s).",
-                            branchName, branchPath(tablePath, branchName)),
-                    e);
-        }
-    }
-
     public void createBranch(String branchName, String tagName) {
         checkArgument(
                 !isMainBranch(branchName),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchAction.java
index 504f493ef..aa8cc697a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchAction.java
@@ -27,28 +27,22 @@ public class CreateBranchAction extends TableActionBase {
     private final String branchName;
     private final String tagName;
 
-    private final Long snapshotId;
-
     public CreateBranchAction(
             String warehouse,
             String databaseName,
             String tableName,
             Map<String, String> catalogConfig,
             String branchName,
-            String tagName,
-            Long snapshotId) {
+            String tagName) {
         super(warehouse, databaseName, tableName, catalogConfig);
         this.branchName = branchName;
         this.tagName = tagName;
-        this.snapshotId = snapshotId;
     }
 
     @Override
     public void run() throws Exception {
         if (!StringUtils.isBlank(tagName)) {
             table.createBranch(branchName, tagName);
-        } else if (snapshotId != null) {
-            table.createBranch(branchName, snapshotId);
         } else {
             table.createBranch(branchName);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchActionFactory.java
index 2a093e16d..d1071d087 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchActionFactory.java
@@ -30,7 +30,6 @@ public class CreateBranchActionFactory implements 
ActionFactory {
 
     private static final String TAG_NAME = "tag_name";
     private static final String BRANCH_NAME = "branch_name";
-    private static final String SNAPSHOT = "snapshot";
 
     @Override
     public String identifier() {
@@ -44,11 +43,6 @@ public class CreateBranchActionFactory implements 
ActionFactory {
         Tuple3<String, String, String> tablePath = getTablePath(params);
         Map<String, String> catalogConfig = optionalConfigMap(params, 
CATALOG_CONF);
 
-        Long snapshot = null;
-        if (params.has(SNAPSHOT)) {
-            snapshot = Long.parseLong(params.get(SNAPSHOT));
-        }
-
         String tagName = null;
         if (params.has(TAG_NAME)) {
             tagName = params.get(TAG_NAME);
@@ -63,8 +57,7 @@ public class CreateBranchActionFactory implements 
ActionFactory {
                         tablePath.f2,
                         catalogConfig,
                         branchName,
-                        tagName,
-                        snapshot);
+                        tagName);
         return Optional.of(action);
     }
 
@@ -76,7 +69,7 @@ public class CreateBranchActionFactory implements 
ActionFactory {
         System.out.println("Syntax:");
         System.out.println(
                 "  create_branch --warehouse <warehouse_path> --database 
<database_name> "
-                        + "--table <table_name> --branch_name <branch_name> 
[--tag_name <tag_name>] [--snapshot <snapshot_id>]");
+                        + "--table <table_name> --branch_name <branch_name> 
[--tag_name <tag_name>]");
         System.out.println();
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
index 3d8ae49cc..093505923 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
@@ -44,27 +44,19 @@ public class CreateBranchProcedure extends ProcedureBase {
     public String[] call(
             ProcedureContext procedureContext, String tableId, String 
branchName, String tagName)
             throws Catalog.TableNotExistException {
-        return innerCall(tableId, branchName, tagName, 0);
+        return innerCall(tableId, branchName, tagName);
     }
 
     public String[] call(ProcedureContext procedureContext, String tableId, 
String branchName)
             throws Catalog.TableNotExistException {
-        return innerCall(tableId, branchName, null, 0);
+        return innerCall(tableId, branchName, null);
     }
 
-    public String[] call(
-            ProcedureContext procedureContext, String tableId, String 
branchName, long snapshotId)
-            throws Catalog.TableNotExistException {
-        return innerCall(tableId, branchName, null, snapshotId);
-    }
-
-    private String[] innerCall(String tableId, String branchName, String 
tagName, long snapshotId)
+    private String[] innerCall(String tableId, String branchName, String 
tagName)
             throws Catalog.TableNotExistException {
         Table table = catalog.getTable(Identifier.fromString(tableId));
         if (!StringUtils.isBlank(tagName)) {
             table.createBranch(branchName, tagName);
-        } else if (snapshotId > 0) {
-            table.createBranch(branchName, snapshotId);
         } else {
             table.createBranch(branchName);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index 33aca03b8..ba197abfa 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -54,7 +54,9 @@ public class BranchSqlITCase extends CatalogITCaseBase {
                         + " (2, 10, 'cat'),"
                         + " (2, 20, 'dog')");
 
-        sql("CALL sys.create_branch('default.T', 'test', 1)");
+        sql("CALL sys.create_tag('default.T', 'tag1', 1)");
+
+        sql("CALL sys.create_branch('default.T', 'test', 'tag1')");
 
         FileStoreTable branchTable = paimonTable("T$branch_test");
         assertThat(branchTable.schema().fields().size()).isEqualTo(3);
@@ -134,37 +136,6 @@ public class BranchSqlITCase extends CatalogITCaseBase {
                         "+I[2, 20, dog]");
     }
 
-    @Test
-    public void testCreateBranchFromSnapshot() throws Exception {
-        sql(
-                "CREATE TABLE T ("
-                        + " pt INT"
-                        + ", k INT"
-                        + ", v STRING"
-                        + ", PRIMARY KEY (pt, k) NOT ENFORCED"
-                        + " ) PARTITIONED BY (pt) WITH ("
-                        + " 'bucket' = '2'"
-                        + " )");
-
-        // snapshot 1.
-        sql("INSERT INTO T VALUES(1, 10, 'apple')");
-
-        // snapshot 2.
-        sql("INSERT INTO T VALUES(1, 20, 'dog')");
-
-        sql("CALL sys.create_branch('default.T', 'test', 1)");
-        sql("CALL sys.create_branch('default.T', 'test2', 2)");
-
-        assertThat(collectResult("SELECT created_from_snapshot FROM 
`T$branches`"))
-                .containsExactlyInAnyOrder("+I[1]", "+I[2]");
-
-        
assertThat(paimonTable("T$branch_test").snapshotManager().snapshotExists(1))
-                .isEqualTo(true);
-
-        
assertThat(paimonTable("T$branch_test2").snapshotManager().snapshotExists(2))
-                .isEqualTo(true);
-    }
-
     @Test
     public void testCreateEmptyBranch() throws Exception {
         sql(
@@ -213,8 +184,12 @@ public class BranchSqlITCase extends CatalogITCaseBase {
         // snapshot 2.
         sql("INSERT INTO T VALUES(1, 20, 'dog')");
 
-        sql("CALL sys.create_branch('default.T', 'test', 1)");
-        sql("CALL sys.create_branch('default.T', 'test2', 2)");
+        sql("CALL sys.create_tag('default.T', 'tag1', 1)");
+
+        sql("CALL sys.create_tag('default.T', 'tag2', 2)");
+
+        sql("CALL sys.create_branch('default.T', 'test', 'tag1')");
+        sql("CALL sys.create_branch('default.T', 'test2', 'tag2')");
 
         assertThat(collectResult("SELECT branch_name, created_from_snapshot 
FROM `T$branches`"))
                 .containsExactlyInAnyOrder("+I[test, 1]", "+I[test2, 2]");
@@ -244,9 +219,13 @@ public class BranchSqlITCase extends CatalogITCaseBase {
         FileStoreTable table = paimonTable("T");
         checkSnapshots(table.snapshotManager(), 1, 3);
 
-        sql("CALL sys.create_branch('default.T', 'test1', 1)");
-        sql("CALL sys.create_branch('default.T', 'test2', 2)");
-        sql("CALL sys.create_branch('default.T', 'test3', 3)");
+        sql("CALL sys.create_tag('default.T', 'tag1', 1)");
+        sql("CALL sys.create_tag('default.T', 'tag2', 2)");
+        sql("CALL sys.create_tag('default.T', 'tag3', 3)");
+
+        sql("CALL sys.create_branch('default.T', 'test1', 'tag1')");
+        sql("CALL sys.create_branch('default.T', 'test2', 'tag2')");
+        sql("CALL sys.create_branch('default.T', 'test3', 'tag3')");
 
         assertThat(collectResult("SELECT created_from_snapshot FROM 
`T$branches`"))
                 .containsExactlyInAnyOrder("+I[1]", "+I[2]", "+I[3]");
@@ -277,7 +256,9 @@ public class BranchSqlITCase extends CatalogITCaseBase {
                 .containsExactlyInAnyOrder(
                         "+I[1, 10, hunter]", "+I[1, 20, hunter]", "+I[1, 30, 
hunter]");
 
-        sql("CALL sys.create_branch('default.T', 'test', 1)");
+        sql("CALL sys.create_tag('default.T', 'tag1', 1)");
+
+        sql("CALL sys.create_branch('default.T', 'test', 'tag1')");
 
         sql("INSERT INTO `T$branch_test` VALUES (2, 10, 'hunterX')");
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
index a83e618cc..7b92f0e3c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
@@ -119,98 +119,6 @@ class BranchActionITCase extends ActionITCaseBase {
         assertThat(branchManager.branchExists("branch_name")).isFalse();
     }
 
-    @Test
-    void testCreateAndDeleteBranchWithSnapshotId() throws Exception {
-
-        init(warehouse);
-
-        RowType rowType =
-                RowType.of(
-                        new DataType[] {DataTypes.BIGINT(), 
DataTypes.STRING()},
-                        new String[] {"k", "v"});
-        FileStoreTable table =
-                createFileStoreTable(
-                        rowType,
-                        Collections.emptyList(),
-                        Collections.singletonList("k"),
-                        Collections.emptyList(),
-                        Collections.emptyMap());
-
-        StreamWriteBuilder writeBuilder = 
table.newStreamWriteBuilder().withCommitUser(commitUser);
-        write = writeBuilder.newWrite();
-        commit = writeBuilder.newCommit();
-
-        // 3 snapshots
-        writeData(rowData(1L, BinaryString.fromString("Hi")));
-        writeData(rowData(2L, BinaryString.fromString("Hello")));
-        writeData(rowData(3L, BinaryString.fromString("Paimon")));
-
-        BranchManager branchManager = table.branchManager();
-
-        callProcedure(
-                String.format(
-                        "CALL sys.create_branch('%s.%s', 
'branch_name_with_snapshotId', 2)",
-                        database, tableName));
-        
assertThat(branchManager.branchExists("branch_name_with_snapshotId")).isTrue();
-
-        callProcedure(
-                String.format(
-                        "CALL sys.delete_branch('%s.%s', 
'branch_name_with_snapshotId')",
-                        database, tableName));
-        
assertThat(branchManager.branchExists("branch_name_with_snapshotId")).isFalse();
-
-        // create branch1 and branch3
-        callProcedure(
-                String.format(
-                        "CALL sys.create_branch('%s.%s', 
'branch_name_with_snapshotId_1', 1)",
-                        database, tableName));
-        
assertThat(branchManager.branchExists("branch_name_with_snapshotId_1")).isTrue();
-
-        callProcedure(
-                String.format(
-                        "CALL sys.create_branch('%s.%s', 
'branch_name_with_snapshotId_3', 3)",
-                        database, tableName));
-        
assertThat(branchManager.branchExists("branch_name_with_snapshotId_3")).isTrue();
-
-        // delete branch1 and branch3 batch
-        callProcedure(
-                String.format(
-                        "CALL sys.delete_branch('%s.%s', 
'branch_name_with_snapshotId_1,branch_name_with_snapshotId_3')",
-                        database, tableName));
-        
assertThat(branchManager.branchExists("branch_name_with_snapshotId_1")).isFalse();
-        
assertThat(branchManager.branchExists("branch_name_with_snapshotId_3")).isFalse();
-
-        createAction(
-                        CreateBranchAction.class,
-                        "create_branch",
-                        "--warehouse",
-                        warehouse,
-                        "--database",
-                        database,
-                        "--table",
-                        tableName,
-                        "--branch_name",
-                        "branch_name_with_snapshotId",
-                        "--snapshot",
-                        "2")
-                .run();
-        
assertThat(branchManager.branchExists("branch_name_with_snapshotId")).isTrue();
-
-        createAction(
-                        DeleteBranchAction.class,
-                        "delete_branch",
-                        "--warehouse",
-                        warehouse,
-                        "--database",
-                        database,
-                        "--table",
-                        tableName,
-                        "--branch_name",
-                        "branch_name_with_snapshotId")
-                .run();
-        
assertThat(branchManager.branchExists("branch_name_with_snapshotId")).isFalse();
-    }
-
     @Test
     void testCreateAndDeleteEmptyBranch() throws Exception {
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
index 8b3968f95..9d507b52f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
@@ -146,8 +146,9 @@ public class ConsumerActionITCase extends ActionITCaseBase {
         writeData(rowData(2L, BinaryString.fromString("Hello")));
         writeData(rowData(3L, BinaryString.fromString("Paimon")));
 
+        table.createTag("tag", 3);
         String branchName = "b1";
-        table.createBranch("b1", 3);
+        table.createBranch("b1", "tag");
         String branchTableName = tableName + "$branch_b1";
 
         // use consumer streaming read table
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateBranchProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateBranchProcedure.java
index 45cfae2e0..2a0e0be02 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateBranchProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateBranchProcedure.java
@@ -26,7 +26,6 @@ import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 
-import static org.apache.spark.sql.types.DataTypes.LongType;
 import static org.apache.spark.sql.types.DataTypes.StringType;
 
 /** Spark procedure to create a branch. */
@@ -36,8 +35,7 @@ public class CreateBranchProcedure extends BaseProcedure {
             new ProcedureParameter[] {
                 ProcedureParameter.required("table", StringType),
                 ProcedureParameter.required("branch", StringType),
-                ProcedureParameter.optional("tag", StringType),
-                ProcedureParameter.optional("snapshot", LongType)
+                ProcedureParameter.optional("tag", StringType)
             };
 
     private static final StructType OUTPUT_TYPE =
@@ -65,15 +63,12 @@ public class CreateBranchProcedure extends BaseProcedure {
         Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
         String branch = args.getString(1);
         String tag = args.isNullAt(2) ? null : args.getString(2);
-        Long snapshot = args.isNullAt(3) ? null : args.getLong(3);
 
         return modifyPaimonTable(
                 tableIdent,
                 table -> {
                     if (tag != null) {
                         table.createBranch(branch, tag);
-                    } else if (snapshot != null) {
-                        table.createBranch(branch, snapshot);
                     } else {
                         table.createBranch(branch);
                     }
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteBranchProcedureTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteBranchProcedureTest.scala
index 59a4b510f..9f9b0e3a9 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteBranchProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteBranchProcedureTest.scala
@@ -93,13 +93,6 @@ class CreateAndDeleteBranchProcedureTest extends 
PaimonSparkTestBase with Stream
               Row(true) :: Nil)
             assert(branchManager.branchExists("empty_branch"))
 
-            // create branch with snapshot
-            checkAnswer(
-              spark.sql(
-                "CALL paimon.sys.create_branch(table => 'test.T', branch => 
'snapshot_branch', snapshot => 2)"),
-              Row(true) :: Nil)
-            assert(branchManager.branchExists("snapshot_branch"))
-
             // delete branch
             checkAnswer(
               spark.sql(
@@ -107,28 +100,6 @@ class CreateAndDeleteBranchProcedureTest extends 
PaimonSparkTestBase with Stream
               Row(true) :: Nil)
             assert(!branchManager.branchExists("test_branch"))
 
-            // create branch with snapshot2
-            checkAnswer(
-              spark.sql(
-                "CALL paimon.sys.create_branch(table => 'test.T', branch => 
'snapshot_branch_2', snapshot => 2)"),
-              Row(true) :: Nil)
-            assert(branchManager.branchExists("snapshot_branch_2"))
-
-            // create branch with snapshot3
-            checkAnswer(
-              spark.sql(
-                "CALL paimon.sys.create_branch(table => 'test.T', branch => 
'snapshot_branch_3', snapshot => 3)"),
-              Row(true) :: Nil)
-            assert(branchManager.branchExists("snapshot_branch_3"))
-
-            // delete branch:snapshot_branch_2 and snapshot_branch_3
-            checkAnswer(
-              spark.sql(
-                "CALL paimon.sys.delete_branch(table => 'test.T', branch => 
'snapshot_branch_2,snapshot_branch_3')"),
-              Row(true) :: Nil)
-            assert(!branchManager.branchExists("snapshot_branch_2"))
-            assert(!branchManager.branchExists("snapshot_branch_3"))
-
           } finally {
             stream.stop()
           }

Reply via email to