This is an automated email from the ASF dual-hosted git repository.

liguojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b878fe5d [Feature] Support create a empty branch and create a branch 
based on snapshotId (#2938)
4b878fe5d is described below

commit 4b878fe5d6f6cf70b6c1a6acd1e158665bc30b95
Author: Xiaojian Sun <[email protected]>
AuthorDate: Wed Apr 24 16:02:28 2024 +0800

    [Feature] Support create a empty branch and create a branch based on 
snapshotId (#2938)
    
    * support create empty branch and create a branch based on snapshotId
---
 .../java/org/apache/paimon/branch/TableBranch.java | 14 ++++
 .../paimon/table/AbstractFileStoreTable.java       | 10 +++
 .../org/apache/paimon/table/ReadonlyTable.java     | 16 ++++
 .../main/java/org/apache/paimon/table/Table.java   |  8 ++
 .../org/apache/paimon/utils/BranchManager.java     | 90 ++++++++++++++++++++--
 .../flink/procedure/CreateBranchProcedure.java     | 24 +++++-
 .../services/org.apache.paimon.factories.Factory   |  2 +-
 .../paimon/flink/action/BranchActionITCase.java    | 80 +++++++++++++++++++
 8 files changed, 235 insertions(+), 9 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/branch/TableBranch.java 
b/paimon-core/src/main/java/org/apache/paimon/branch/TableBranch.java
index 4b24c866f..9b5b478fc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/branch/TableBranch.java
+++ b/paimon-core/src/main/java/org/apache/paimon/branch/TableBranch.java
@@ -26,6 +26,20 @@ public class TableBranch {
 
     private final long createTime;
 
+    public TableBranch(String branchName, Long createdFromSnapshot, long 
createTime) {
+        this.branchName = branchName;
+        this.createdFromTag = null;
+        this.createdFromSnapshot = createdFromSnapshot;
+        this.createTime = createTime;
+    }
+
+    public TableBranch(String branchName, long createTime) {
+        this.branchName = branchName;
+        this.createdFromTag = null;
+        this.createdFromSnapshot = null;
+        this.createTime = createTime;
+    }
+
     public TableBranch(
             String branchName, String createdFromTag, Long 
createdFromSnapshot, long createTime) {
         this.branchName = branchName;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 1ca321d56..7d4a4a7c2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -524,6 +524,16 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
                         store().createTagCallbacks());
     }
 
+    @Override
+    public void createBranch(String branchName) {
+        branchManager().createBranch(branchName);
+    }
+
+    @Override
+    public void createBranch(String branchName, long snapshotId) {
+        branchManager().createBranch(branchName, snapshotId);
+    }
+
     @Override
     public void createBranch(String branchName, String tagName) {
         branchManager().createBranch(branchName, tagName);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
index be4976f10..42bea3f68 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
@@ -150,6 +150,22 @@ public interface ReadonlyTable extends InnerTable {
                         this.getClass().getSimpleName()));
     }
 
+    @Override
+    default void createBranch(String branchName) {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "Readonly Table %s does not support create empty 
branch.",
+                        this.getClass().getSimpleName()));
+    }
+
+    @Override
+    default void createBranch(String branchName, long snapshotId) {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "Readonly Table %s does not support createBranch with 
snapshotId.",
+                        this.getClass().getSimpleName()));
+    }
+
     @Override
     default void createBranch(String branchName, String tagName) {
         throw new UnsupportedOperationException(
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java 
b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
index 3650b773c..876908394 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
@@ -95,6 +95,14 @@ public interface Table extends Serializable {
     @Experimental
     void rollbackTo(String tagName);
 
+    /** Create a empty branch. */
+    @Experimental
+    void createBranch(String branchName);
+
+    /** Create a branch from given snapshot. */
+    @Experimental
+    void createBranch(String branchName, long snapshotId);
+
     /** Create a branch from given tag. */
     @Experimental
     void createBranch(String branchName, String tagName);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
index 4656deb67..f3f06f892 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
@@ -23,6 +23,7 @@ import org.apache.paimon.branch.TableBranch;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 
@@ -33,6 +34,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Optional;
 import java.util.PriorityQueue;
 import java.util.SortedMap;
 import java.util.stream.Collectors;
@@ -82,6 +84,65 @@ public class BranchManager {
         return new Path(getBranchPath(tablePath, branchName));
     }
 
+    /** Create empty branch. */
+    public void createBranch(String branchName) {
+        checkArgument(
+                !branchName.equals(DEFAULT_MAIN_BRANCH),
+                String.format(
+                        "Branch name '%s' is the default branch and cannot be 
used.",
+                        DEFAULT_MAIN_BRANCH));
+        checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is 
blank.", branchName);
+        checkArgument(!branchExists(branchName), "Branch name '%s' already 
exists.", branchName);
+        checkArgument(
+                !branchName.chars().allMatch(Character::isDigit),
+                "Branch name cannot be pure numeric string but is '%s'.",
+                branchName);
+        try {
+            TableSchema latestSchema = schemaManager.latest().get();
+            fileIO.copyFileUtf8(
+                    schemaManager.toSchemaPath(latestSchema.id()),
+                    schemaManager.branchSchemaPath(branchName, 
latestSchema.id()));
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    String.format(
+                            "Exception occurs when create branch '%s' 
(directory in %s).",
+                            branchName, getBranchPath(tablePath, branchName)),
+                    e);
+        }
+    }
+
+    public void createBranch(String branchName, long snapshotId) {
+        checkArgument(
+                !branchName.equals(DEFAULT_MAIN_BRANCH),
+                String.format(
+                        "Branch name '%s' is the default branch and cannot be 
used.",
+                        DEFAULT_MAIN_BRANCH));
+        checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is 
blank.", branchName);
+        checkArgument(!branchExists(branchName), "Branch name '%s' already 
exists.", branchName);
+        checkArgument(
+                !branchName.chars().allMatch(Character::isDigit),
+                "Branch name cannot be pure numeric string but is '%s'.",
+                branchName);
+
+        Snapshot snapshot = snapshotManager.snapshot(snapshotId);
+
+        try {
+            // Copy the corresponding snapshot and schema files into the 
branch directory
+            fileIO.copyFileUtf8(
+                    snapshotManager.snapshotPath(snapshotId),
+                    snapshotManager.branchSnapshotPath(branchName, 
snapshot.id()));
+            fileIO.copyFileUtf8(
+                    schemaManager.toSchemaPath(snapshot.schemaId()),
+                    schemaManager.branchSchemaPath(branchName, 
snapshot.schemaId()));
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    String.format(
+                            "Exception occurs when create branch '%s' 
(directory in %s).",
+                            branchName, getBranchPath(tablePath, branchName)),
+                    e);
+        }
+    }
+
     public void createBranch(String branchName, String tagName) {
         checkArgument(
                 !branchName.equals(DEFAULT_MAIN_BRANCH),
@@ -170,15 +231,34 @@ public class BranchManager {
                     new 
PriorityQueue<>(Comparator.comparingLong(TableBranch::getCreateTime));
             for (Pair<Path, Long> path : paths) {
                 String branchName = 
path.getLeft().getName().substring(BRANCH_PREFIX.length());
+                Optional<TableSchema> tableSchema = 
schemaManager.latest(branchName);
+                if (!tableSchema.isPresent()) {
+                    // Support empty branch.
+                    pq.add(new TableBranch(branchName, path.getValue()));
+                    continue;
+                }
                 FileStoreTable branchTable =
                         FileStoreTableFactory.create(
                                 fileIO, new Path(getBranchPath(tablePath, 
branchName)));
+
                 SortedMap<Snapshot, List<String>> snapshotTags = 
branchTable.tagManager().tags();
-                checkArgument(!snapshotTags.isEmpty());
-                Snapshot snapshot = snapshotTags.firstKey();
-                List<String> tags = snapshotTags.get(snapshot);
-                checkArgument(tags.size() == 1);
-                pq.add(new TableBranch(branchName, tags.get(0), snapshot.id(), 
path.getValue()));
+                Long earliestSnapshotId = 
branchTable.snapshotManager().earliestSnapshotId();
+                if (snapshotTags.isEmpty()) {
+                    // Create based on snapshotId.
+                    pq.add(new TableBranch(branchName, earliestSnapshotId, 
path.getValue()));
+                } else {
+                    Snapshot snapshot = snapshotTags.firstKey();
+                    if (earliestSnapshotId == snapshot.id()) {
+                        List<String> tags = snapshotTags.get(snapshot);
+                        checkArgument(tags.size() == 1);
+                        pq.add(
+                                new TableBranch(
+                                        branchName, tags.get(0), 
snapshot.id(), path.getValue()));
+                    } else {
+                        // Create based on snapshotId.
+                        pq.add(new TableBranch(branchName, earliestSnapshotId, 
path.getValue()));
+                    }
+                }
             }
 
             List<TableBranch> branches = new ArrayList<>(pq.size());
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
index b870f088b..3d8ae49cc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
@@ -22,6 +22,7 @@ import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.table.Table;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.table.procedure.ProcedureContext;
 
 /**
@@ -43,13 +44,30 @@ public class CreateBranchProcedure extends ProcedureBase {
     public String[] call(
             ProcedureContext procedureContext, String tableId, String 
branchName, String tagName)
             throws Catalog.TableNotExistException {
-        return innerCall(tableId, branchName, tagName);
+        return innerCall(tableId, branchName, tagName, 0);
     }
 
-    private String[] innerCall(String tableId, String branchName, String 
tagName)
+    public String[] call(ProcedureContext procedureContext, String tableId, 
String branchName)
+            throws Catalog.TableNotExistException {
+        return innerCall(tableId, branchName, null, 0);
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext, String tableId, String 
branchName, long snapshotId)
+            throws Catalog.TableNotExistException {
+        return innerCall(tableId, branchName, null, snapshotId);
+    }
+
+    private String[] innerCall(String tableId, String branchName, String 
tagName, long snapshotId)
             throws Catalog.TableNotExistException {
         Table table = catalog.getTable(Identifier.fromString(tableId));
-        table.createBranch(branchName, tagName);
+        if (!StringUtils.isBlank(tagName)) {
+            table.createBranch(branchName, tagName);
+        } else if (snapshotId > 0) {
+            table.createBranch(branchName, snapshotId);
+        } else {
+            table.createBranch(branchName);
+        }
         return new String[] {"Success"};
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 04f2b7933..fe463161b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -44,4 +44,4 @@ org.apache.paimon.flink.procedure.MigrateDatabaseProcedure
 org.apache.paimon.flink.procedure.MigrateFileProcedure
 org.apache.paimon.flink.procedure.RemoveOrphanFilesProcedure
 org.apache.paimon.flink.procedure.QueryServiceProcedure
-org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure
+org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
index 7a6361657..4f4f31496 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
@@ -80,4 +80,84 @@ class BranchActionITCase extends ActionITCaseBase {
                         "CALL sys.delete_branch('%s.%s', 'branch_name')", 
database, tableName));
         assertThat(branchManager.branchExists("branch_name")).isFalse();
     }
+
+    @Test
+    void testCreateAndDeleteBranchWithSnapshotId() throws Exception {
+
+        init(warehouse);
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.BIGINT(), 
DataTypes.STRING()},
+                        new String[] {"k", "v"});
+        FileStoreTable table =
+                createFileStoreTable(
+                        rowType,
+                        Collections.emptyList(),
+                        Collections.singletonList("k"),
+                        Collections.emptyMap());
+
+        StreamWriteBuilder writeBuilder = 
table.newStreamWriteBuilder().withCommitUser(commitUser);
+        write = writeBuilder.newWrite();
+        commit = writeBuilder.newCommit();
+
+        // 3 snapshots
+        writeData(rowData(1L, BinaryString.fromString("Hi")));
+        writeData(rowData(2L, BinaryString.fromString("Hello")));
+        writeData(rowData(3L, BinaryString.fromString("Paimon")));
+
+        BranchManager branchManager = table.branchManager();
+
+        callProcedure(
+                String.format(
+                        "CALL sys.create_branch('%s.%s', 
'branch_name_with_snapshotId', 2)",
+                        database, tableName));
+        
assertThat(branchManager.branchExists("branch_name_with_snapshotId")).isTrue();
+        branchManager.branches();
+
+        callProcedure(
+                String.format(
+                        "CALL sys.delete_branch('%s.%s', 
'branch_name_with_snapshotId')",
+                        database, tableName));
+        
assertThat(branchManager.branchExists("branch_name_with_snapshotId")).isFalse();
+    }
+
+    @Test
+    void testCreateAndDeleteEmptyBranch() throws Exception {
+
+        init(warehouse);
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.BIGINT(), 
DataTypes.STRING()},
+                        new String[] {"k", "v"});
+        FileStoreTable table =
+                createFileStoreTable(
+                        rowType,
+                        Collections.emptyList(),
+                        Collections.singletonList("k"),
+                        Collections.emptyMap());
+
+        StreamWriteBuilder writeBuilder = 
table.newStreamWriteBuilder().withCommitUser(commitUser);
+        write = writeBuilder.newWrite();
+        commit = writeBuilder.newCommit();
+
+        // 3 snapshots
+        writeData(rowData(1L, BinaryString.fromString("Hi")));
+        writeData(rowData(2L, BinaryString.fromString("Hello")));
+        writeData(rowData(3L, BinaryString.fromString("Paimon")));
+
+        BranchManager branchManager = table.branchManager();
+        callProcedure(
+                String.format(
+                        "CALL sys.create_branch('%s.%s', 'empty_branch_name')",
+                        database, tableName));
+        assertThat(branchManager.branchExists("empty_branch_name")).isTrue();
+
+        callProcedure(
+                String.format(
+                        "CALL sys.delete_branch('%s.%s', 'empty_branch_name')",
+                        database, tableName));
+        assertThat(branchManager.branchExists("empty_branch_name")).isFalse();
+    }
 }

Reply via email to