This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f90c59d164 [rest] Implement SupportsBranches in REST Catalog (#5168)
f90c59d164 is described below

commit f90c59d16485476510512b45c04e9afc11105551
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Feb 27 11:33:39 2025 +0800

    [rest] Implement SupportsBranches in REST Catalog (#5168)
---
 .../apache/paimon/catalog/SupportsBranches.java    | 102 ++++++++++--
 .../java/org/apache/paimon/rest/RESTCatalog.java   |  96 +++++++++--
 .../java/org/apache/paimon/rest/ResourcePaths.java |  31 +++-
 .../paimon/rest/requests/CreateBranchRequest.java  |  62 +++++++
 .../paimon/rest/requests/ForwardBranchRequest.java |  46 ++++++
 .../rest/responses/ErrorResponseResourceType.java  |   6 +-
 .../rest/responses/ListBranchesResponse.java       |  48 ++++++
 .../paimon/utils/FileSystemBranchManager.java      |   6 +
 .../org/apache/paimon/rest/RESTCatalogServer.java  |  47 ++++++
 .../org/apache/paimon/rest/RESTCatalogTest.java    |  16 ++
 paimon-open-api/rest-catalog-open-api.yaml         | 181 +++++++++++++++++++++
 .../paimon/open/api/RESTCatalogController.java     |  80 +++++++++
 12 files changed, 689 insertions(+), 32 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/SupportsBranches.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/SupportsBranches.java
index f051038d40..950268d3c4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/SupportsBranches.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/SupportsBranches.java
@@ -34,40 +34,116 @@ public interface SupportsBranches extends Catalog {
      * @param branch the branch name
      * @param fromTag from the tag
      * @throws TableNotExistException if the table in identifier doesn't exist
-     * @throws DatabaseNotExistException if the database in identifier doesn't 
exist
+     * @throws BranchAlreadyExistException if the branch already exists
+     * @throws TagNotExistException if the tag doesn't exist
      */
     void createBranch(Identifier identifier, String branch, @Nullable String 
fromTag)
-            throws TableNotExistException, DatabaseNotExistException;
+            throws TableNotExistException, BranchAlreadyExistException, 
TagNotExistException;
 
     /**
      * Drop the branch for this table.
      *
      * @param identifier path of the table, cannot be system or branch name.
      * @param branch the branch name
-     * @throws TableNotExistException if the table in identifier doesn't exist
-     * @throws DatabaseNotExistException if the database in identifier doesn't 
exist
+     * @throws BranchNotExistException if the branch doesn't exist
      */
-    void dropBranch(Identifier identifier, String branch)
-            throws TableNotExistException, DatabaseNotExistException;
+    void dropBranch(Identifier identifier, String branch) throws 
BranchNotExistException;
 
     /**
      * Fast-forward a branch to main branch.
      *
      * @param identifier path of the table, cannot be system or branch name.
      * @param branch the branch name
-     * @throws TableNotExistException if the table in identifier doesn't exist
-     * @throws DatabaseNotExistException if the database in identifier doesn't 
exist
+     * @throws BranchNotExistException if the branch doesn't exist
      */
-    void fastForward(Identifier identifier, String branch)
-            throws TableNotExistException, DatabaseNotExistException;
+    void fastForward(Identifier identifier, String branch) throws 
BranchNotExistException;
 
     /**
      * List all branches of the table.
      *
      * @param identifier path of the table, cannot be system or branch name.
      * @throws TableNotExistException if the table in identifier doesn't exist
-     * @throws DatabaseNotExistException if the database in identifier doesn't 
exist
      */
-    List<String> listBranches(Identifier identifier)
-            throws TableNotExistException, DatabaseNotExistException;
+    List<String> listBranches(Identifier identifier) throws 
TableNotExistException;
+
+    /** Exception for trying to create a branch that already exists. */
+    class BranchAlreadyExistException extends Exception {
+
+        private static final String MSG = "Branch %s in table %s already 
exists.";
+
+        private final Identifier identifier;
+        private final String branch;
+
+        public BranchAlreadyExistException(Identifier identifier, String 
branch) {
+            this(identifier, branch, null);
+        }
+
+        public BranchAlreadyExistException(Identifier identifier, String 
branch, Throwable cause) {
+            super(String.format(MSG, branch, identifier.getFullName()), cause);
+            this.identifier = identifier;
+            this.branch = branch;
+        }
+
+        public Identifier identifier() {
+            return identifier;
+        }
+
+        public String branch() {
+            return branch;
+        }
+    }
+
+    /** Exception for trying to operate on a branch that doesn't exist. */
+    class BranchNotExistException extends Exception {
+
+        private static final String MSG = "Branch %s in table %s doesn't 
exist.";
+
+        private final Identifier identifier;
+        private final String branch;
+
+        public BranchNotExistException(Identifier identifier, String branch) {
+            this(identifier, branch, null);
+        }
+
+        public BranchNotExistException(Identifier identifier, String branch, 
Throwable cause) {
+            super(String.format(MSG, branch, identifier.getFullName()), cause);
+            this.identifier = identifier;
+            this.branch = branch;
+        }
+
+        public Identifier identifier() {
+            return identifier;
+        }
+
+        public String branch() {
+            return branch;
+        }
+    }
+
+    /** Exception for trying to operate on a tag that doesn't exist. */
+    class TagNotExistException extends Exception {
+
+        private static final String MSG = "Tag %s in table %s doesn't exist.";
+
+        private final Identifier identifier;
+        private final String tag;
+
+        public TagNotExistException(Identifier identifier, String tag) {
+            this(identifier, tag, null);
+        }
+
+        public TagNotExistException(Identifier identifier, String tag, 
Throwable cause) {
+            super(String.format(MSG, tag, identifier.getFullName()), cause);
+            this.identifier = identifier;
+            this.tag = tag;
+        }
+
+        public Identifier identifier() {
+            return identifier;
+        }
+
+        public String tag() {
+            return tag;
+        }
+    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 141b22b45d..4fbf8dfa03 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -25,6 +25,7 @@ import org.apache.paimon.catalog.CatalogUtils;
 import org.apache.paimon.catalog.Database;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.catalog.PropertyChange;
+import org.apache.paimon.catalog.SupportsBranches;
 import org.apache.paimon.catalog.SupportsSnapshots;
 import org.apache.paimon.catalog.TableMetadata;
 import org.apache.paimon.fs.FileIO;
@@ -45,11 +46,13 @@ import org.apache.paimon.rest.requests.AlterDatabaseRequest;
 import org.apache.paimon.rest.requests.AlterPartitionsRequest;
 import org.apache.paimon.rest.requests.AlterTableRequest;
 import org.apache.paimon.rest.requests.CommitTableRequest;
+import org.apache.paimon.rest.requests.CreateBranchRequest;
 import org.apache.paimon.rest.requests.CreateDatabaseRequest;
 import org.apache.paimon.rest.requests.CreatePartitionsRequest;
 import org.apache.paimon.rest.requests.CreateTableRequest;
 import org.apache.paimon.rest.requests.CreateViewRequest;
 import org.apache.paimon.rest.requests.DropPartitionsRequest;
+import org.apache.paimon.rest.requests.ForwardBranchRequest;
 import org.apache.paimon.rest.requests.MarkDonePartitionsRequest;
 import org.apache.paimon.rest.requests.RenameTableRequest;
 import org.apache.paimon.rest.responses.AlterDatabaseResponse;
@@ -62,6 +65,7 @@ import org.apache.paimon.rest.responses.GetTableResponse;
 import org.apache.paimon.rest.responses.GetTableSnapshotResponse;
 import org.apache.paimon.rest.responses.GetTableTokenResponse;
 import org.apache.paimon.rest.responses.GetViewResponse;
+import org.apache.paimon.rest.responses.ListBranchesResponse;
 import org.apache.paimon.rest.responses.ListDatabasesResponse;
 import org.apache.paimon.rest.responses.ListPartitionsResponse;
 import org.apache.paimon.rest.responses.ListTablesResponse;
@@ -79,6 +83,8 @@ import org.apache.paimon.view.ViewSchema;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.ArrayList;
@@ -103,7 +109,7 @@ import static 
org.apache.paimon.rest.auth.AuthSession.createAuthSession;
 import static 
org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool;
 
 /** A catalog implementation for REST. */
-public class RESTCatalog implements Catalog, SupportsSnapshots {
+public class RESTCatalog implements Catalog, SupportsSnapshots, 
SupportsBranches {
 
     public static final String HEADER_PREFIX = "header.";
 
@@ -235,15 +241,11 @@ public class RESTCatalog implements Catalog, 
SupportsSnapshots {
             Set<String> removeKeys = setPropertiesToRemoveKeys.getRight();
             AlterDatabaseRequest request =
                     new AlterDatabaseRequest(new ArrayList<>(removeKeys), 
updateProperties);
-            AlterDatabaseResponse response =
-                    client.post(
-                            resourcePaths.databaseProperties(name),
-                            request,
-                            AlterDatabaseResponse.class,
-                            restAuthFunction);
-            //            if (response.getUpdated().isEmpty()) {
-            //                throw new IllegalStateException("Failed to 
update properties");
-            //            }
+            client.post(
+                    resourcePaths.databaseProperties(name),
+                    request,
+                    AlterDatabaseResponse.class,
+                    restAuthFunction);
         } catch (NoSuchResourceException e) {
             if (!ignoreIfNotExists) {
                 throw new DatabaseNotExistException(name);
@@ -578,6 +580,80 @@ public class RESTCatalog implements Catalog, 
SupportsSnapshots {
         }
     }
 
+    @Override
+    public void createBranch(Identifier identifier, String branch, @Nullable 
String fromTag)
+            throws TableNotExistException, BranchAlreadyExistException, 
TagNotExistException {
+        try {
+            CreateBranchRequest request = new CreateBranchRequest(branch, 
fromTag);
+            client.post(
+                    resourcePaths.branches(identifier.getDatabaseName(), 
identifier.getTableName()),
+                    request,
+                    restAuthFunction);
+        } catch (NoSuchResourceException e) {
+            if (e.resourceType() == ErrorResponseResourceType.TABLE) {
+                throw new TableNotExistException(identifier, e);
+            } else if (e.resourceType() == ErrorResponseResourceType.TAG) {
+                throw new TagNotExistException(identifier, fromTag, e);
+            } else {
+                throw e;
+            }
+        } catch (AlreadyExistsException e) {
+            throw new BranchAlreadyExistException(identifier, branch, e);
+        } catch (ForbiddenException e) {
+            throw new TableNoPermissionException(identifier, e);
+        }
+    }
+
+    @Override
+    public void dropBranch(Identifier identifier, String branch) throws 
BranchNotExistException {
+        try {
+            client.delete(
+                    resourcePaths.branch(
+                            identifier.getDatabaseName(), 
identifier.getTableName(), branch),
+                    restAuthFunction);
+        } catch (NoSuchResourceException e) {
+            throw new BranchNotExistException(identifier, branch, e);
+        } catch (ForbiddenException e) {
+            throw new TableNoPermissionException(identifier, e);
+        }
+    }
+
+    @Override
+    public void fastForward(Identifier identifier, String branch) throws 
BranchNotExistException {
+        try {
+            ForwardBranchRequest request = new ForwardBranchRequest(branch);
+            client.post(
+                    resourcePaths.forwardBranch(
+                            identifier.getDatabaseName(), 
identifier.getTableName()),
+                    request,
+                    restAuthFunction);
+        } catch (NoSuchResourceException e) {
+            throw new BranchNotExistException(identifier, branch, e);
+        } catch (ForbiddenException e) {
+            throw new TableNoPermissionException(identifier, e);
+        }
+    }
+
+    @Override
+    public List<String> listBranches(Identifier identifier) throws 
TableNotExistException {
+        try {
+            ListBranchesResponse response =
+                    client.get(
+                            resourcePaths.branches(
+                                    identifier.getDatabaseName(), 
identifier.getTableName()),
+                            ListBranchesResponse.class,
+                            restAuthFunction);
+            if (response == null || response.branches() == null) {
+                return Collections.emptyList();
+            }
+            return response.branches();
+        } catch (NoSuchResourceException e) {
+            throw new TableNotExistException(identifier);
+        } catch (ForbiddenException e) {
+            throw new TableNoPermissionException(identifier, e);
+        }
+    }
+
     @Override
     public View getView(Identifier identifier) throws ViewNotExistException {
         try {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java
index 50d0a18c08..e733654901 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java
@@ -29,6 +29,9 @@ public class ResourcePaths {
     private static final String V1 = "/v1";
     private static final String DATABASES = "databases";
     private static final String TABLES = "tables";
+    private static final String PARTITIONS = "partitions";
+    private static final String BRANCHES = "branches";
+    private static final String VIEWS = "views";
     public static final String QUERY_PARAMETER_WAREHOUSE_KEY = "warehouse";
 
     public static String config(String warehouse) {
@@ -82,33 +85,47 @@ public class ResourcePaths {
     }
 
     public String partitions(String databaseName, String tableName) {
-        return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, 
tableName, "partitions");
+        return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, 
tableName, PARTITIONS);
     }
 
     public String dropPartitions(String databaseName, String tableName) {
         return SLASH.join(
-                V1, prefix, DATABASES, databaseName, TABLES, tableName, 
"partitions", "drop");
+                V1, prefix, DATABASES, databaseName, TABLES, tableName, 
PARTITIONS, "drop");
     }
 
     public String alterPartitions(String databaseName, String tableName) {
         return SLASH.join(
-                V1, prefix, DATABASES, databaseName, TABLES, tableName, 
"partitions", "alter");
+                V1, prefix, DATABASES, databaseName, TABLES, tableName, 
PARTITIONS, "alter");
     }
 
     public String markDonePartitions(String databaseName, String tableName) {
         return SLASH.join(
-                V1, prefix, DATABASES, databaseName, TABLES, tableName, 
"partitions", "mark");
+                V1, prefix, DATABASES, databaseName, TABLES, tableName, 
PARTITIONS, "mark");
+    }
+
+    public String branches(String databaseName, String tableName) {
+        return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, 
tableName, BRANCHES);
+    }
+
+    public String branch(String databaseName, String tableName, String 
branchName) {
+        return SLASH.join(
+                V1, prefix, DATABASES, databaseName, TABLES, tableName, 
BRANCHES, branchName);
+    }
+
+    public String forwardBranch(String databaseName, String tableName) {
+        return SLASH.join(
+                V1, prefix, DATABASES, databaseName, TABLES, tableName, 
BRANCHES, "forward");
     }
 
     public String views(String databaseName) {
-        return SLASH.join(V1, prefix, DATABASES, databaseName, "views");
+        return SLASH.join(V1, prefix, DATABASES, databaseName, VIEWS);
     }
 
     public String view(String databaseName, String viewName) {
-        return SLASH.join(V1, prefix, DATABASES, databaseName, "views", 
viewName);
+        return SLASH.join(V1, prefix, DATABASES, databaseName, VIEWS, 
viewName);
     }
 
     public String renameView(String databaseName) {
-        return SLASH.join(V1, prefix, DATABASES, databaseName, "views", 
"rename");
+        return SLASH.join(V1, prefix, DATABASES, databaseName, VIEWS, 
"rename");
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/requests/CreateBranchRequest.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/requests/CreateBranchRequest.java
new file mode 100644
index 0000000000..394807d6ff
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/requests/CreateBranchRequest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.requests;
+
+import org.apache.paimon.rest.RESTRequest;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+/** Request for creating branch. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CreateBranchRequest implements RESTRequest {
+
+    private static final String FIELD_BRANCH = "branch";
+    private static final String FIELD_FROM_TAG = "fromTag";
+
+    @JsonProperty(FIELD_BRANCH)
+    private final String branch;
+
+    @Nullable
+    @JsonProperty(FIELD_FROM_TAG)
+    private final String fromTag;
+
+    @JsonCreator
+    public CreateBranchRequest(
+            @JsonProperty(FIELD_BRANCH) String branch,
+            @Nullable @JsonProperty(FIELD_FROM_TAG) String fromTag) {
+        this.branch = branch;
+        this.fromTag = fromTag;
+    }
+
+    @JsonGetter(FIELD_BRANCH)
+    public String branch() {
+        return branch;
+    }
+
+    @Nullable
+    @JsonGetter(FIELD_FROM_TAG)
+    public String fromTag() {
+        return fromTag;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/requests/ForwardBranchRequest.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/requests/ForwardBranchRequest.java
new file mode 100644
index 0000000000..d8b6f3bf8c
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/requests/ForwardBranchRequest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.requests;
+
+import org.apache.paimon.rest.RESTRequest;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+/** Request for forwarding branch. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ForwardBranchRequest implements RESTRequest {
+
+    private static final String FIELD_BRANCH = "branch";
+
+    @JsonProperty(FIELD_BRANCH)
+    private final String branch;
+
+    @JsonCreator
+    public ForwardBranchRequest(@JsonProperty(FIELD_BRANCH) String branch) {
+        this.branch = branch;
+    }
+
+    @JsonGetter(FIELD_BRANCH)
+    public String branch() {
+        return branch;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponseResourceType.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponseResourceType.java
index cb05c6c6c5..dc715303bd 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponseResourceType.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponseResourceType.java
@@ -23,6 +23,8 @@ public enum ErrorResponseResourceType {
     DATABASE,
     TABLE,
     COLUMN,
-    VIEW,
-    SNAPSHOT
+    SNAPSHOT,
+    BRANCH,
+    TAG,
+    VIEW
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListBranchesResponse.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListBranchesResponse.java
new file mode 100644
index 0000000000..ffb1bfe0ab
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListBranchesResponse.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.responses;
+
+import org.apache.paimon.rest.RESTResponse;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+/** Response for listing branches. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ListBranchesResponse implements RESTResponse {
+
+    public static final String FIELD_BRANCHES = "branches";
+
+    @JsonProperty(FIELD_BRANCHES)
+    private final List<String> branches;
+
+    @JsonCreator
+    public ListBranchesResponse(@JsonProperty(FIELD_BRANCHES) List<String> 
branches) {
+        this.branches = branches;
+    }
+
+    @JsonGetter(FIELD_BRANCHES)
+    public List<String> branches() {
+        return branches;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java
index 2c085fda8d..3d4dd7df43 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java
@@ -150,6 +150,12 @@ public class FileSystemBranchManager implements 
BranchManager {
         checkArgument(branchExists(branchName), "Branch name '%s' doesn't 
exist.", branchName);
 
         Long earliestSnapshotId = 
snapshotManager.copyWithBranch(branchName).earliestSnapshotId();
+        if (earliestSnapshotId == null) {
+            throw new RuntimeException(
+                    "Cannot fast forward branch "
+                            + branchName
+                            + ", because it does not have snapshot.");
+        }
         Snapshot earliestSnapshot =
                 
snapshotManager.copyWithBranch(branchName).snapshot(earliestSnapshotId);
         long earliestSchemaId = earliestSnapshot.schemaId();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index fd84bf7924..15de6189c1 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -36,11 +36,13 @@ import org.apache.paimon.rest.requests.AlterDatabaseRequest;
 import org.apache.paimon.rest.requests.AlterPartitionsRequest;
 import org.apache.paimon.rest.requests.AlterTableRequest;
 import org.apache.paimon.rest.requests.CommitTableRequest;
+import org.apache.paimon.rest.requests.CreateBranchRequest;
 import org.apache.paimon.rest.requests.CreateDatabaseRequest;
 import org.apache.paimon.rest.requests.CreatePartitionsRequest;
 import org.apache.paimon.rest.requests.CreateTableRequest;
 import org.apache.paimon.rest.requests.CreateViewRequest;
 import org.apache.paimon.rest.requests.DropPartitionsRequest;
+import org.apache.paimon.rest.requests.ForwardBranchRequest;
 import org.apache.paimon.rest.requests.MarkDonePartitionsRequest;
 import org.apache.paimon.rest.requests.RenameTableRequest;
 import org.apache.paimon.rest.responses.AlterDatabaseResponse;
@@ -53,6 +55,7 @@ import org.apache.paimon.rest.responses.GetTableResponse;
 import org.apache.paimon.rest.responses.GetTableSnapshotResponse;
 import org.apache.paimon.rest.responses.GetTableTokenResponse;
 import org.apache.paimon.rest.responses.GetViewResponse;
+import org.apache.paimon.rest.responses.ListBranchesResponse;
 import org.apache.paimon.rest.responses.ListDatabasesResponse;
 import org.apache.paimon.rest.responses.ListPartitionsResponse;
 import org.apache.paimon.rest.responses.ListTablesResponse;
@@ -62,6 +65,7 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FormatTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.types.DataField;
+import org.apache.paimon.utils.BranchManager;
 import org.apache.paimon.view.View;
 import org.apache.paimon.view.ViewImpl;
 import org.apache.paimon.view.ViewSchema;
@@ -217,6 +221,11 @@ public class RESTCatalogServer {
                                         && "tables".equals(resources[1])
                                         && "partitions".equals(resources[3])
                                         && "mark".equals(resources[4]);
+
+                        boolean isBranches =
+                                resources.length >= 4
+                                        && "tables".equals(resources[1])
+                                        && "branches".equals(resources[3]);
                         if (isDropPartitions) {
                             String tableName = resources[2];
                             Identifier identifier = 
Identifier.create(databaseName, tableName);
@@ -271,6 +280,43 @@ public class RESTCatalogServer {
                                 return error.get();
                             }
                             return partitionsApiHandler(catalog, request, 
databaseName, tableName);
+                        } else if (isBranches) {
+                            String tableName = resources[2];
+                            Identifier identifier = 
Identifier.create(databaseName, tableName);
+                            FileStoreTable table = (FileStoreTable) 
catalog.getTable(identifier);
+                            BranchManager branchManager = 
table.branchManager();
+                            switch (request.getMethod()) {
+                                case "DELETE":
+                                    String branch = resources[4];
+                                    table.deleteBranch(branch);
+                                    return new 
MockResponse().setResponseCode(200);
+                                case "GET":
+                                    List<String> branches = 
branchManager.branches();
+                                    response = new 
ListBranchesResponse(branches);
+                                    return mockResponse(response, 200);
+                                case "POST":
+                                    if (resources.length == 5) {
+                                        ForwardBranchRequest requestBody =
+                                                OBJECT_MAPPER.readValue(
+                                                        
request.getBody().readUtf8(),
+                                                        
ForwardBranchRequest.class);
+                                        
branchManager.fastForward(requestBody.branch());
+                                    } else {
+                                        CreateBranchRequest requestBody =
+                                                OBJECT_MAPPER.readValue(
+                                                        
request.getBody().readUtf8(),
+                                                        
CreateBranchRequest.class);
+                                        if (requestBody.fromTag() == null) {
+                                            
branchManager.createBranch(requestBody.branch());
+                                        } else {
+                                            branchManager.createBranch(
+                                                    requestBody.branch(), 
requestBody.fromTag());
+                                        }
+                                    }
+                                    return new 
MockResponse().setResponseCode(200);
+                                default:
+                                    return new 
MockResponse().setResponseCode(404);
+                            }
                         } else if (isTableToken) {
                             RESTToken dataToken =
                                     
catalog.getToken(Identifier.create(databaseName, resources[2]));
@@ -393,6 +439,7 @@ public class RESTCatalogServer {
                     response = new ErrorResponse(null, null, e.getMessage(), 
400);
                     return mockResponse(response, 400);
                 } catch (Exception e) {
+                    e.printStackTrace();
                     if (e.getCause() instanceof IllegalArgumentException) {
                         response =
                                 new ErrorResponse(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index 3ba1943d10..b9a550950c 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -163,6 +163,22 @@ class RESTCatalogTest extends CatalogTestBase {
         assertThat(snapshot).isEmpty();
     }
 
+    @Test
+    void testBranches() throws Exception {
+        String databaseName = "testBranchTable";
+        catalog.dropDatabase(databaseName, true, true);
+        catalog.createDatabase(databaseName, true);
+        Identifier identifier = Identifier.create(databaseName, "table");
+        catalog.createTable(
+                identifier, Schema.newBuilder().column("col", 
DataTypes.INT()).build(), true);
+
+        RESTCatalog restCatalog = (RESTCatalog) catalog;
+        restCatalog.createBranch(identifier, "my_branch", null);
+        
assertThat(restCatalog.listBranches(identifier)).containsOnly("my_branch");
+        restCatalog.dropBranch(identifier, "my_branch");
+        assertThat(restCatalog.listBranches(identifier)).isEmpty();
+    }
+
     @Override
     protected boolean supportsFormatTable() {
         return true;
diff --git a/paimon-open-api/rest-catalog-open-api.yaml 
b/paimon-open-api/rest-catalog-open-api.yaml
index 868a456e5e..42b302ca7f 100644
--- a/paimon-open-api/rest-catalog-open-api.yaml
+++ b/paimon-open-api/rest-catalog-open-api.yaml
@@ -346,6 +346,11 @@ paths:
           required: true
           schema:
             type: string
+        - name: table
+          in: path
+          required: true
+          schema:
+            type: string
       responses:
         "200":
           description: Success, no content
@@ -701,6 +706,162 @@ paths:
                 $ref: '#/components/schemas/ErrorResponse'
         "500":
           description: Internal Server Error
+  /v1/{prefix}/databases/{database}/tables/{table}/branches:
+    get:
+      tags:
+        - branch
+      summary: List branches
+      operationId: listBranches
+      parameters:
+        - name: prefix
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: database
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: table
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        "200":
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ListBranchesResponse'
+        "404":
+          description: Resource not found
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ErrorResponse'
+        "500":
+          description: Internal Server Error
+    post:
+      tags:
+        - branch
+      summary: Create branch
+      operationId: createBranch
+      parameters:
+        - name: prefix
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: database
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: table
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/CreateBranchRequest'
+      responses:
+        "200":
+          description: Success, no content
+        "404":
+          description: Resource not found
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ErrorResponse'
+        "500":
+          description: Internal Server Error
+  /v1/{prefix}/databases/{database}/tables/{table}/branches/{branch}:
+    delete:
+      tags:
+        - branch
+      summary: Drop branch
+      operationId: dropBranch
+      parameters:
+        - name: prefix
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: database
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: table
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: branch
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        "200":
+          description: Success, no content
+        "404":
+          description: Resource not found
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ErrorResponse'
+        "500":
+          description: Internal Server Error
+  /v1/{prefix}/databases/{database}/tables/{table}/branches/forward:
+    post:
+      tags:
+        - branch
+      summary: forward branch
+      operationId: forwardBranch
+      parameters:
+        - name: prefix
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: database
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: table
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/ForwardBranchRequest'
+      responses:
+        "200":
+          description: Success, no content
+        "404":
+          description: Resource not found
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ErrorResponse'
+        "409":
+          description: Resource has exist
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ErrorResponse'
+        "500":
+          description: Internal Server Error
   /v1/{prefix}/databases/{database}/views:
     get:
       tags:
@@ -1344,6 +1505,26 @@ components:
           type: array
           items:
             $ref: '#/components/schemas/Partition'
+    CreateBranchRequest:
+      type: object
+      properties:
+        branch:
+          type: string
+        fromTag:
+          nullable: true
+          type: string
+    ForwardBranchRequest:
+      type: object
+      properties:
+        branch:
+          type: string
+    ListBranchesResponse:
+      type: object
+      properties:
+        branches:
+          type: array
+          items:
+            type: string
     GetViewResponse:
       type: object
       properties:
diff --git 
a/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
 
b/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
index 1869170a2c..8dbff3b8c2 100644
--- 
a/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
+++ 
b/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
@@ -23,11 +23,13 @@ import org.apache.paimon.rest.requests.AlterDatabaseRequest;
 import org.apache.paimon.rest.requests.AlterPartitionsRequest;
 import org.apache.paimon.rest.requests.AlterTableRequest;
 import org.apache.paimon.rest.requests.CommitTableRequest;
+import org.apache.paimon.rest.requests.CreateBranchRequest;
 import org.apache.paimon.rest.requests.CreateDatabaseRequest;
 import org.apache.paimon.rest.requests.CreatePartitionsRequest;
 import org.apache.paimon.rest.requests.CreateTableRequest;
 import org.apache.paimon.rest.requests.CreateViewRequest;
 import org.apache.paimon.rest.requests.DropPartitionsRequest;
+import org.apache.paimon.rest.requests.ForwardBranchRequest;
 import org.apache.paimon.rest.requests.MarkDonePartitionsRequest;
 import org.apache.paimon.rest.requests.RenameTableRequest;
 import org.apache.paimon.rest.responses.AlterDatabaseResponse;
@@ -40,6 +42,7 @@ import org.apache.paimon.rest.responses.GetTableResponse;
 import org.apache.paimon.rest.responses.GetTableSnapshotResponse;
 import org.apache.paimon.rest.responses.GetTableTokenResponse;
 import org.apache.paimon.rest.responses.GetViewResponse;
+import org.apache.paimon.rest.responses.ListBranchesResponse;
 import org.apache.paimon.rest.responses.ListDatabasesResponse;
 import org.apache.paimon.rest.responses.ListPartitionsResponse;
 import org.apache.paimon.rest.responses.ListTablesResponse;
@@ -519,6 +522,83 @@ public class RESTCatalogController {
             @PathVariable String table,
             @RequestBody MarkDonePartitionsRequest request) {}
 
+    @Operation(
+            summary = "List branches",
+            tags = {"branch"})
+    @ApiResponses({
+        @ApiResponse(
+                responseCode = "200",
+                content = {
+                    @Content(schema = @Schema(implementation = 
ListBranchesResponse.class))
+                }),
+        @ApiResponse(
+                responseCode = "404",
+                description = "Resource not found",
+                content = {@Content(schema = @Schema(implementation = 
ErrorResponse.class))}),
+        @ApiResponse(
+                responseCode = "500",
+                content = {@Content(schema = @Schema())})
+    })
+    @GetMapping("/v1/{prefix}/databases/{database}/tables/{table}/branches")
+    public ListBranchesResponse listBranches(
+            @PathVariable String prefix,
+            @PathVariable String database,
+            @PathVariable String table) {
+        return new ListBranchesResponse(ImmutableList.of("branch"));
+    }
+
+    @Operation(
+            summary = "Create branch",
+            tags = {"branch"})
+    @ApiResponses({
+        @ApiResponse(responseCode = "200", description = "Success, no 
content"),
+        @ApiResponse(
+                responseCode = "500",
+                content = {@Content(schema = @Schema())})
+    })
+    @PostMapping("/v1/{prefix}/databases/{database}/tables/{table}/branches")
+    public void createBranch(
+            @PathVariable String prefix,
+            @PathVariable String database,
+            @PathVariable String table,
+            @RequestBody CreateBranchRequest request) {}
+
+    @Operation(
+            summary = "Forward branch",
+            tags = {"branch"})
+    @ApiResponses({
+        @ApiResponse(responseCode = "200", description = "Success, no 
content"),
+        @ApiResponse(
+                responseCode = "500",
+                content = {@Content(schema = @Schema())})
+    })
+    
@PostMapping("/v1/{prefix}/databases/{database}/tables/{table}/branches/forward")
+    public void forwardBranch(
+            @PathVariable String prefix,
+            @PathVariable String database,
+            @PathVariable String table,
+            @RequestBody ForwardBranchRequest request) {}
+
+    @Operation(
+            summary = "Drop branch",
+            tags = {"branch"})
+    @ApiResponses({
+        @ApiResponse(responseCode = "200", description = "Success, no 
content"),
+        @ApiResponse(
+                responseCode = "404",
+                description = "Resource not found",
+                content = {@Content(schema = @Schema(implementation = 
ErrorResponse.class))}),
+        @ApiResponse(
+                responseCode = "500",
+                content = {@Content(schema = @Schema())})
+    })
+    
@DeleteMapping("/v1/{prefix}/databases/{database}/tables/table/branches/branch")
+    public void dropBranch(
+            @PathVariable String prefix,
+            @PathVariable String database,
+            @PathVariable String table,
+            @PathVariable String branch) {}
+
     @Operation(
             summary = "List views",
             tags = {"view"})


Reply via email to