This is an automated email from the ASF dual-hosted git repository. hope pushed a commit to branch release-1.4 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit fb2484ed0f1e60060fb9cc997450f083ec6c9a49 Author: xuzifu666 <[email protected]> AuthorDate: Mon Mar 30 17:49:05 2026 +0800 [core] Support rename branch API and procedures (#7549) Currently user can't change branch name directly, need to create a new branch and then delete the old one, which is inconvenient. Therefore provide an API for rename branch. This PR provided: 1. Experimental rename branch API in Table 2. RenameBranchProcedure in Flink/Spark 3. RestCatalog api for rename branch --- docs/content/flink/procedures.md | 18 ++ docs/content/spark/procedures.md | 12 ++ docs/static/rest-catalog-open-api.yaml | 48 +++++ .../main/java/org/apache/paimon/rest/RESTApi.java | 22 +++ .../java/org/apache/paimon/rest/ResourcePaths.java | 13 ++ .../paimon/rest/requests/RenameBranchRequest.java | 46 +++++ .../org/apache/paimon/catalog/AbstractCatalog.java | 6 + .../java/org/apache/paimon/catalog/Catalog.java | 15 ++ .../org/apache/paimon/catalog/DelegateCatalog.java | 6 + .../java/org/apache/paimon/rest/RESTCatalog.java | 14 ++ .../paimon/table/AbstractFileStoreTable.java | 5 + .../paimon/table/DelegatedFileStoreTable.java | 5 + .../java/org/apache/paimon/table/FormatTable.java | 5 + .../org/apache/paimon/table/ReadonlyTable.java | 8 + .../main/java/org/apache/paimon/table/Table.java | 4 + .../org/apache/paimon/utils/BranchManager.java | 2 + .../apache/paimon/utils/CatalogBranchManager.java | 5 + .../paimon/utils/FileSystemBranchManager.java | 19 ++ .../org/apache/paimon/rest/RESTCatalogServer.java | 46 ++++- .../org/apache/paimon/rest/RESTCatalogTest.java | 21 ++- .../paimon/utils/FileSystemBranchManagerTest.java | 196 +++++++++++++++++++++ .../flink/procedure/RenameBranchProcedure.java | 59 +++++++ .../services/org.apache.paimon.factories.Factory | 1 + .../org/apache/paimon/flink/BranchSqlITCase.java | 54 ++++++ .../org/apache/paimon/spark/SparkProcedures.java | 2 + .../spark/procedure/RenameBranchProcedure.java | 90 ++++++++++ .../spark/procedure/BranchProcedureTest.scala | 50 ++++++ 27 files changed, 762 insertions(+), 10 deletions(-) diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md index 739e3d4754..49e65d705b 100644 --- a/docs/content/flink/procedures.md +++ b/docs/content/flink/procedures.md @@ -793,6 +793,24 @@ All available procedures are listed below. CALL sys.delete_branch(`table` => 'default.T', branch => 'branch1') </td> </tr> + <tr> + <td>rename_branch</td> + <td> + -- Use named argument<br/> + CALL [catalog.]sys.rename_branch(`table` => 'identifier', from_branch => 'branchName', to_branch => 'newBranchName')<br/><br/> + -- Use indexed argument<br/> + CALL [catalog.]sys.rename_branch('identifier', 'branchName', 'newBranchName') + </td> + <td> + To rename a branch. Arguments: + <li>table: the target table identifier. Cannot be empty.</li> + <li>from_branch: name of the branch to be renamed.</li> + <li>to_branch: new name of the branch.</li> + </td> + <td> + CALL sys.rename_branch(`table` => 'default.T', from_branch => 'branch1', to_branch => 'branch2') + </td> + </tr> <tr> <td>fast_forward</td> <td> diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md index 29ad120eab..ffc7005f42 100644 --- a/docs/content/spark/procedures.md +++ b/docs/content/spark/procedures.md @@ -317,6 +317,18 @@ This section introduce all available spark procedures about paimon. CALL sys.delete_branch(table => 'test_db.T', branch => 'test_branch') </td> </tr> + <tr> + <td>rename_branch</td> + <td> + To rename a branch. Arguments: + <li>table: the target table identifier. Cannot be empty.</li> + <li>from_branch: name of the branch to be renamed.</li> + <li>to_branch: new name of the branch.</li> + </td> + <td> + CALL sys.rename_branch(table => 'test_db.T', from_branch => 'test_branch', to_branch => 'new_branch') + </td> + </tr> <tr> <td>fast_forward</td> <td> diff --git a/docs/static/rest-catalog-open-api.yaml b/docs/static/rest-catalog-open-api.yaml index a4bda071f9..33f64524ec 100644 --- a/docs/static/rest-catalog-open-api.yaml +++ b/docs/static/rest-catalog-open-api.yaml @@ -1183,6 +1183,49 @@ paths: $ref: '#/components/responses/BranchNotExistErrorResponse' "500": $ref: '#/components/responses/ServerErrorResponse' + /v1/{prefix}/databases/{database}/tables/{table}/branches/{branch}/rename: + post: + tags: + - branch + summary: rename branch + operationId: renameBranch + parameters: + - name: prefix + in: path + required: true + schema: + type: string + - name: database + in: path + required: true + schema: + type: string + - name: table + in: path + required: true + schema: + type: string + - name: branch + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/RenameBranchRequest' + responses: + "200": + description: Success, no content + "401": + $ref: '#/components/responses/UnauthorizedErrorResponse' + "404": + $ref: '#/components/responses/BranchNotExistErrorResponse' + "409": + description: Branch already exists + "500": + $ref: '#/components/responses/ServerErrorResponse' /v1/{prefix}/databases/{database}/tables/{table}/branches/{branch}/forward: post: tags: @@ -3251,6 +3294,11 @@ components: fromTag: nullable: true type: string + RenameBranchRequest: + type: object + properties: + toBranch: + type: string ForwardBranchRequest: type: object properties: diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java index a939c5ce16..88761ca5f0 100644 --- a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java +++ b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java @@ -49,6 +49,7 @@ import org.apache.paimon.rest.requests.ForwardBranchRequest; import org.apache.paimon.rest.requests.ListPartitionsByNamesRequest; import org.apache.paimon.rest.requests.MarkDonePartitionsRequest; import org.apache.paimon.rest.requests.RegisterTableRequest; +import org.apache.paimon.rest.requests.RenameBranchRequest; import org.apache.paimon.rest.requests.RenameTableRequest; import org.apache.paimon.rest.requests.ResetConsumerRequest; import org.apache.paimon.rest.requests.RollbackSchemaRequest; @@ -965,6 +966,27 @@ public class RESTApi { restAuthFunction); } + /** + * Rename branch for table. + * + * @param identifier database name and table name. + * @param fromBranch source branch name + * @param toBranch target branch name + * @throws NoSuchResourceException Exception thrown on HTTP 404 means the branch not exists + * @throws AlreadyExistsException Exception thrown on HTTP 409 means the target branch already + * exists + * @throws ForbiddenException Exception thrown on HTTP 403 means don't have the permission for + * this table + */ + public void renameBranch(Identifier identifier, String fromBranch, String toBranch) { + RenameBranchRequest request = new RenameBranchRequest(toBranch); + client.post( + resourcePaths.renameBranch( + identifier.getDatabaseName(), identifier.getObjectName(), fromBranch), + request, + restAuthFunction); + } + /** * Forward branch for table. * diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java b/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java index 466ac975b4..6d1d16c82f 100644 --- a/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java +++ b/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java @@ -262,6 +262,19 @@ public class ResourcePaths { "forward"); } + public String renameBranch(String databaseName, String tableName, String branch) { + return SLASH.join( + V1, + prefix, + DATABASES, + encodeString(databaseName), + TABLES, + encodeString(tableName), + BRANCHES, + encodeString(branch), + "rename"); + } + public String tags(String databaseName, String objectName) { return SLASH.join( V1, diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/requests/RenameBranchRequest.java b/paimon-api/src/main/java/org/apache/paimon/rest/requests/RenameBranchRequest.java new file mode 100644 index 0000000000..63cf0011fa --- /dev/null +++ b/paimon-api/src/main/java/org/apache/paimon/rest/requests/RenameBranchRequest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.requests; + +import org.apache.paimon.rest.RESTRequest; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +/** Request for renaming branch. */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class RenameBranchRequest implements RESTRequest { + + private static final String FIELD_TO_BRANCH = "toBranch"; + + @JsonProperty(FIELD_TO_BRANCH) + private final String toBranch; + + @JsonCreator + public RenameBranchRequest(@JsonProperty(FIELD_TO_BRANCH) String toBranch) { + this.toBranch = toBranch; + } + + @JsonGetter(FIELD_TO_BRANCH) + public String toBranch() { + return toBranch; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 709010d2ce..ad51ab1f4d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -516,6 +516,12 @@ public abstract class AbstractCatalog implements Catalog { throw new UnsupportedOperationException(); } + @Override + public void renameBranch(Identifier identifier, String fromBranch, String toBranch) + throws BranchNotExistException, BranchAlreadyExistException { + throw new UnsupportedOperationException(); + } + @Override public void fastForward(Identifier identifier, String branch) throws BranchNotExistException { throw new UnsupportedOperationException(); diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index 8924dc6ae1..6d337f3716 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -663,6 +663,7 @@ public interface Catalog extends AutoCloseable { * <li>{@link #rollbackTo(Identifier, Instant)}. * <li>{@link #createBranch(Identifier, String, String)}. * <li>{@link #dropBranch(Identifier, String)}. + * <li>{@link #renameBranch(Identifier, String, String)}. * <li>{@link #listBranches(Identifier)}. * <li>{@link #getTag(Identifier, String)}. * <li>{@link #createTag(Identifier, String, Long, String, boolean)}. @@ -875,6 +876,20 @@ public interface Catalog extends AutoCloseable { */ void dropBranch(Identifier identifier, String branch) throws BranchNotExistException; + /** + * Rename a branch for this table. + * + * @param identifier path of the table, cannot be system or branch name. + * @param fromBranch the source branch name + * @param toBranch the target branch name + * @throws BranchNotExistException if the source branch doesn't exist + * @throws BranchAlreadyExistException if the target branch already exists + * @throws UnsupportedOperationException if the catalog does not {@link + * #supportsVersionManagement()} + */ + void renameBranch(Identifier identifier, String fromBranch, String toBranch) + throws BranchNotExistException, BranchAlreadyExistException; + /** * Fast-forward a branch to main branch. * diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java index 4a8eeabe50..ec5138a8cb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java @@ -246,6 +246,12 @@ public abstract class DelegateCatalog implements Catalog { wrapped.dropBranch(identifier, branch); } + @Override + public void renameBranch(Identifier identifier, String fromBranch, String toBranch) + throws BranchNotExistException, BranchAlreadyExistException { + wrapped.renameBranch(identifier, fromBranch, toBranch); + } + @Override public void fastForward(Identifier identifier, String branch) throws BranchNotExistException { wrapped.fastForward(identifier, branch); diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index dbb2a8fd48..364cdb3cd5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -733,6 +733,20 @@ public class RESTCatalog implements Catalog { } } + @Override + public void renameBranch(Identifier identifier, String fromBranch, String toBranch) + throws BranchNotExistException, BranchAlreadyExistException { + try { + api.renameBranch(identifier, fromBranch, toBranch); + } catch (NoSuchResourceException e) { + throw new BranchNotExistException(identifier, fromBranch, e); + } catch (AlreadyExistsException e) { + throw new BranchAlreadyExistException(identifier, toBranch, e); + } catch (ForbiddenException e) { + throw new TableNoPermissionException(identifier, e); + } + } + @Override public void fastForward(Identifier identifier, String branch) throws BranchNotExistException { try { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 52b9d162a5..3583d17c08 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -718,6 +718,11 @@ abstract class AbstractFileStoreTable implements FileStoreTable { branchManager().dropBranch(branchName); } + @Override + public void renameBranch(String fromBranch, String toBranch) { + branchManager().renameBranch(fromBranch, toBranch); + } + @Override public void fastForward(String branchName) { branchManager().fastForward(branchName); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java index 9eaa4f8d2e..1f234d81ff 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java @@ -285,6 +285,11 @@ public abstract class DelegatedFileStoreTable implements FileStoreTable { wrapped.deleteBranch(branchName); } + @Override + public void renameBranch(String fromBranch, String toBranch) { + wrapped.renameBranch(fromBranch, toBranch); + } + @Override public void fastForward(String branchName) { wrapped.fastForward(branchName); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java index d7af8510a9..7f13faf753 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java @@ -417,6 +417,11 @@ public interface FormatTable extends Table { throw new UnsupportedOperationException(); } + @Override + default void renameBranch(String fromBranch, String toBranch) { + throw new UnsupportedOperationException(); + } + @Override default void fastForward(String branchName) { throw new UnsupportedOperationException(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java index 55f14e7025..b685f86e3e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java @@ -265,6 +265,14 @@ public interface ReadonlyTable extends InnerTable { this.getClass().getSimpleName())); } + @Override + default void renameBranch(String fromBranch, String toBranch) { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support renameBranch.", + this.getClass().getSimpleName())); + } + @Override default void fastForward(String branchName) { throw new UnsupportedOperationException( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index 13d56d6849..98c00c0101 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -202,6 +202,10 @@ public interface Table extends Serializable { } } + /** Rename a branch. */ + @Experimental + void renameBranch(String fromBranch, String toBranch); + /** Merge a branch to main branch. */ @Experimental void fastForward(String branchName); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index e055724efe..c830a799bf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -59,6 +59,8 @@ public interface BranchManager { void fastForward(String branchName); + void renameBranch(String fromBranch, String toBranch); + List<String> branches(); default boolean branchExists(String branchName) { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java index 8723135bd0..4ba9c39475 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java @@ -108,6 +108,11 @@ public class CatalogBranchManager implements BranchManager { }); } + @Override + public void renameBranch(String fromBranch, String toBranch) { + executePost(catalog -> catalog.renameBranch(identifier, fromBranch, toBranch)); + } + @Override public List<String> branches() { return executeGet(catalog -> catalog.listBranches(identifier)); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java index ef3a04ec99..3257ea1060 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java @@ -220,6 +220,25 @@ public class FileSystemBranchManager implements BranchManager { checkArgument(!branchExists(branchName), "Branch name '%s' already exists.", branchName); } + @Override + public void renameBranch(String fromBranch, String toBranch) { + checkArgument(!BranchManager.isMainBranch(fromBranch), "Cannot rename the main branch."); + checkArgument(branchExists(fromBranch), "Branch name '%s' doesn't exist.", fromBranch); + checkArgument(!branchExists(toBranch), "Branch name '%s' already exists.", toBranch); + BranchManager.validateBranch(toBranch); + + try { + // Use rename for atomic operation and better performance + fileIO.rename(branchPath(fromBranch), branchPath(toBranch)); + } catch (IOException e) { + throw new RuntimeException( + String.format( + "Exception occurs when rename branch from '%s' to '%s'.", + fromBranch, toBranch), + e); + } + } + @Override public List<String> branches() { try { diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java index bd9036a4b0..5961101c3a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java @@ -61,6 +61,7 @@ import org.apache.paimon.rest.requests.CreateTagRequest; import org.apache.paimon.rest.requests.CreateViewRequest; import org.apache.paimon.rest.requests.ListPartitionsByNamesRequest; import org.apache.paimon.rest.requests.MarkDonePartitionsRequest; +import org.apache.paimon.rest.requests.RenameBranchRequest; import org.apache.paimon.rest.requests.RenameTableRequest; import org.apache.paimon.rest.requests.ResetConsumerRequest; import org.apache.paimon.rest.requests.RollbackSchemaRequest; @@ -1837,15 +1838,42 @@ public class RESTCatalogServer { case "POST": if (resources.length == 6) { branch = RESTUtil.decodeString(resources[4]); - branchManager.fastForward(branch); - branchIdentifier = - new Identifier( - identifier.getDatabaseName(), - identifier.getTableName(), - branch); - tableLatestSnapshotStore.put( - identifier.getFullName(), - tableLatestSnapshotStore.get(branchIdentifier.getFullName())); + if ("rename".equals(resources[5])) { + // Rename branch: /branches/{branch}/rename + RenameBranchRequest requestBody = + RESTApi.fromJson(data, RenameBranchRequest.class); + String toBranch = requestBody.toBranch(); + table.renameBranch(branch, toBranch); + // Update store for renamed branch + Identifier fromBranchIdentifier = + new Identifier( + identifier.getDatabaseName(), + identifier.getTableName(), + branch); + Identifier toBranchIdentifier = + new Identifier( + identifier.getDatabaseName(), + identifier.getTableName(), + toBranch); + tableLatestSnapshotStore.put( + toBranchIdentifier.getFullName(), + tableLatestSnapshotStore.get( + fromBranchIdentifier.getFullName())); + tableMetadataStore.put( + toBranchIdentifier.getFullName(), + tableMetadataStore.get(fromBranchIdentifier.getFullName())); + } else if ("forward".equals(resources[5])) { + // Fast forward branch + branchManager.fastForward(branch); + branchIdentifier = + new Identifier( + identifier.getDatabaseName(), + identifier.getTableName(), + branch); + tableLatestSnapshotStore.put( + identifier.getFullName(), + tableLatestSnapshotStore.get(branchIdentifier.getFullName())); + } } else { CreateBranchRequest requestBody = RESTApi.fromJson(data, CreateBranchRequest.class); diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java index 0c20dfdc17..e70a11321a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java @@ -2132,7 +2132,26 @@ public abstract class RESTCatalogTest extends CatalogTestBase { Catalog.BranchAlreadyExistException.class, () -> restCatalog.createBranch(identifier, "my_branch", null)); assertThat(restCatalog.listBranches(identifier)).containsOnly("my_branch"); - restCatalog.dropBranch(identifier, "my_branch"); + + // Test rename branch + restCatalog.renameBranch(identifier, "my_branch", "renamed_branch"); + assertThat(restCatalog.listBranches(identifier)).containsOnly("renamed_branch"); + assertThat(restCatalog.getTable(new Identifier(databaseName, "table", "renamed_branch"))) + .isNotNull(); + + // Test rename to existing branch should fail + restCatalog.createBranch(identifier, "another_branch", null); + assertThrows( + Catalog.BranchAlreadyExistException.class, + () -> restCatalog.renameBranch(identifier, "renamed_branch", "another_branch")); + + // Test rename non-existent branch should fail + assertThrows( + Catalog.BranchNotExistException.class, + () -> restCatalog.renameBranch(identifier, "non_existent_branch", "new_branch")); + + restCatalog.dropBranch(identifier, "renamed_branch"); + restCatalog.dropBranch(identifier, "another_branch"); assertThrows( Catalog.BranchNotExistException.class, diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/FileSystemBranchManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/FileSystemBranchManagerTest.java new file mode 100644 index 0000000000..c74b53d3f3 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/utils/FileSystemBranchManagerTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.FileIOFinder; +import org.apache.paimon.fs.Path; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.types.DataTypes; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Unit tests for {@link FileSystemBranchManager}. */ +class FileSystemBranchManagerTest { + + @TempDir java.nio.file.Path tempDir; + + private Path tablePath; + private FileIO fileIO; + private SnapshotManager snapshotManager; + private TagManager tagManager; + private SchemaManager schemaManager; + private FileSystemBranchManager branchManager; + + @BeforeEach + void before() throws Exception { + tablePath = new Path(tempDir.toUri().toString()); + fileIO = FileIOFinder.find(tablePath); + + // Create schema + Schema schema = + Schema.newBuilder() + .column("product_id", DataTypes.INT()) + .column("price", DataTypes.INT()) + .column("sales", DataTypes.INT()) + .primaryKey("product_id") + .build(); + + // Create schema manager and initialize schema + schemaManager = new SchemaManager(fileIO, tablePath); + schemaManager.createTable(schema); + + // Create snapshot, tag, and branch managers + snapshotManager = new SnapshotManager(fileIO, tablePath, null, null, null); + tagManager = new TagManager(fileIO, tablePath); + branchManager = + new FileSystemBranchManager( + fileIO, tablePath, snapshotManager, tagManager, schemaManager); + } + + @Test + void testRenameBranchBasic() { + // Create a branch + branchManager.createBranch("old_branch"); + assertThat(branchManager.branchExists("old_branch")).isTrue(); + + // Rename the branch + branchManager.renameBranch("old_branch", "new_branch"); + + // Verify old branch no longer exists + assertThat(branchManager.branchExists("old_branch")).isFalse(); + + // Verify new branch exists + assertThat(branchManager.branchExists("new_branch")).isTrue(); + + // Verify branches list contains new branch + List<String> branches = branchManager.branches(); + assertThat(branches).contains("new_branch"); + assertThat(branches).doesNotContain("old_branch"); + } + + @Test + void testRenameNonExistentBranch() { + // Try to rename non-existent branch should throw exception + assertThatThrownBy(() -> branchManager.renameBranch("non_existent", "new_branch")) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("doesn't exist"); + } + + @Test + void testRenameToExistingBranch() { + // Create two branches + branchManager.createBranch("branch1"); + branchManager.createBranch("branch2"); + + // Try to rename to existing branch should throw exception + assertThatThrownBy(() -> branchManager.renameBranch("branch1", "branch2")) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("already exists"); + } + + @Test + void testRenameMainBranchShouldFail() { + // Try to rename main branch should throw exception + assertThatThrownBy(() -> branchManager.renameBranch("main", "renamed_main")) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("main"); + } + + @Test + void testRenameBranchFromTag() { + // Create a tag + branchManager.createBranch("old_branch"); + + // Rename branch created from tag + branchManager.renameBranch("old_branch", "new_branch"); + + // Verify new branch exists + assertThat(branchManager.branchExists("new_branch")).isTrue(); + assertThat(branchManager.branchExists("old_branch")).isFalse(); + } + + @Test + void testRenameBranchPreservesData() { + // Create a branch + branchManager.createBranch("test_branch"); + assertThat(branchManager.branchExists("test_branch")).isTrue(); + + // Rename the branch + branchManager.renameBranch("test_branch", "renamed_branch"); + + // Verify the renamed branch exists and the original does not + assertThat(branchManager.branchExists("test_branch")).isFalse(); + assertThat(branchManager.branchExists("renamed_branch")).isTrue(); + } + + @Test + void testRenameBranchMultipleTimes() { + // Create a branch + branchManager.createBranch("branch1"); + + // Rename multiple times + branchManager.renameBranch("branch1", "branch2"); + branchManager.renameBranch("branch2", "branch3"); + + // Verify final state + assertThat(branchManager.branchExists("branch1")).isFalse(); + assertThat(branchManager.branchExists("branch2")).isFalse(); + assertThat(branchManager.branchExists("branch3")).isTrue(); + + List<String> branches = branchManager.branches(); + assertThat(branches).contains("branch3"); + assertThat(branches).doesNotContain("branch1", "branch2"); + } + + @Test + void testRenameBranchValidatesBranchNames() { + // Try to rename with invalid target branch name + branchManager.createBranch("valid_branch"); + + // Test numeric branch name + assertThatThrownBy(() -> branchManager.renameBranch("valid_branch", "123")) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("pure numeric"); + + // Test blank branch name + assertThatThrownBy(() -> branchManager.renameBranch("valid_branch", "")) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("blank"); + } + + @Test + void testRenameBranchToSameName() { + // Create a branch + branchManager.createBranch("same_name"); + + // Try to rename to the same name should throw exception + assertThatThrownBy(() -> branchManager.renameBranch("same_name", "same_name")) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("already exists"); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RenameBranchProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RenameBranchProcedure.java new file mode 100644 index 0000000000..8b60c7aa50 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RenameBranchProcedure.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.Table; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; + +/** + * Rename branch procedure. Usage: + * + * <pre><code> + * CALL sys.rename_branch('tableId', 'fromBranch', 'toBranch') + * </code></pre> + */ +public class RenameBranchProcedure extends ProcedureBase { + + public static final String IDENTIFIER = "rename_branch"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @ProcedureHint( + argument = { + @ArgumentHint(name = "table", type = @DataTypeHint("STRING")), + @ArgumentHint(name = "from_branch", type = @DataTypeHint("STRING")), + @ArgumentHint(name = "to_branch", type = @DataTypeHint("STRING")) + }) + public String[] call( + ProcedureContext procedureContext, String tableId, String fromBranch, String toBranch) + throws Catalog.TableNotExistException { + Table table = catalog.getTable(Identifier.fromString(tableId)); + table.renameBranch(fromBranch, toBranch); + return new String[] {"Success"}; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 092b470aa7..f87a250ec2 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -62,6 +62,7 @@ org.apache.paimon.flink.procedure.ExpireTagsProcedure org.apache.paimon.flink.procedure.ReplaceTagProcedure org.apache.paimon.flink.procedure.CreateBranchProcedure org.apache.paimon.flink.procedure.DeleteBranchProcedure +org.apache.paimon.flink.procedure.RenameBranchProcedure org.apache.paimon.flink.procedure.DropPartitionProcedure org.apache.paimon.flink.procedure.MergeIntoProcedure org.apache.paimon.flink.procedure.ResetConsumerProcedure diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java index bcd3e0d964..1b7d50c852 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java @@ -869,6 +869,60 @@ public class BranchSqlITCase extends CatalogITCaseBase { .isEmpty(); } + @Test + public void testRenameBranch() throws Exception { + sql( + "CREATE TABLE T (" + + " pt INT" + + ", k INT" + + ", v STRING" + + ", PRIMARY KEY (pt, k) NOT ENFORCED" + + " ) PARTITIONED BY (pt) WITH (" + + " 'bucket' = '2'" + + " )"); + + // snapshot 1. + sql("INSERT INTO T VALUES (1, 10, 'apple'), (1, 20, 'banana')"); + // snapshot 2. + sql("INSERT INTO T VALUES (2, 10, 'cat'), (2, 20, 'dog')"); + + // create tag + sql("CALL sys.create_tag('default.T', 'tag1', 1)"); + + // create branch from tag + sql("CALL sys.create_branch('default.T', 'branch1', 'tag1')"); + + // verify branch exists + FileStoreTable table = paimonTable("T"); + assertThat(table.branchManager().branchExists("branch1")).isTrue(); + + // rename branch + sql("CALL sys.rename_branch('default.T', 'branch1', 'branch2')"); + + // verify old branch does not exist + table = paimonTable("T"); + assertThat(table.branchManager().branchExists("branch1")).isFalse(); + + // verify new branch exists + assertThat(table.branchManager().branchExists("branch2")).isTrue(); + + // verify data in renamed branch + assertThat(collectResult("SELECT * FROM T$branch_branch2")) + .containsExactlyInAnyOrder("+I[1, 10, apple]", "+I[1, 20, banana]"); + + // rename non-existent branch should fail + assertThatThrownBy( + () -> + sql( + "CALL sys.rename_branch('default.T', 'nonexistent', 'new_branch')")) + .hasMessageContaining("Branch"); + + // rename to existing branch should fail + sql("CALL sys.create_branch('default.T', 'branch3')"); + assertThatThrownBy(() -> sql("CALL sys.rename_branch('default.T', 'branch2', 'branch3')")) + .hasMessageContaining("Branch"); + } + private List<String> collectResult(String sql) throws Exception { List<String> result = new ArrayList<>(); try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) { diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java index 74bf04d68c..f27478bbbd 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java @@ -46,6 +46,7 @@ import org.apache.paimon.spark.procedure.ProcedureBuilder; import org.apache.paimon.spark.procedure.PurgeFilesProcedure; import org.apache.paimon.spark.procedure.RemoveOrphanFilesProcedure; import org.apache.paimon.spark.procedure.RemoveUnexistingFilesProcedure; +import org.apache.paimon.spark.procedure.RenameBranchProcedure; import org.apache.paimon.spark.procedure.RenameTagProcedure; import org.apache.paimon.spark.procedure.RepairProcedure; import org.apache.paimon.spark.procedure.ReplaceTagProcedure; @@ -98,6 +99,7 @@ public class SparkProcedures { procedureBuilders.put("create_global_index", CreateGlobalIndexProcedure::builder); procedureBuilders.put("drop_global_index", DropGlobalIndexProcedure::builder); procedureBuilders.put("delete_branch", DeleteBranchProcedure::builder); + procedureBuilders.put("rename_branch", RenameBranchProcedure::builder); procedureBuilders.put("compact", CompactProcedure::builder); procedureBuilders.put("compact_database", CompactDatabaseProcedure::builder); procedureBuilders.put("rescale", RescaleProcedure::builder); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RenameBranchProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RenameBranchProcedure.java new file mode 100644 index 0000000000..f30209875e --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RenameBranchProcedure.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.procedure; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +import static org.apache.spark.sql.types.DataTypes.StringType; + +/** Spark procedure to rename a branch. */ +public class RenameBranchProcedure extends BaseProcedure { + + private static final ProcedureParameter[] PARAMETERS = + new ProcedureParameter[] { + ProcedureParameter.required("table", StringType), + ProcedureParameter.required("from_branch", StringType), + ProcedureParameter.required("to_branch", StringType) + }; + + private static final StructType OUTPUT_TYPE = + new StructType( + new StructField[] { + new StructField("result", DataTypes.BooleanType, true, Metadata.empty()) + }); + + protected RenameBranchProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); + String fromBranch = args.getString(1); + String toBranch = args.getString(2); + InternalRow[] result = + modifyPaimonTable( + tableIdent, + table -> { + table.renameBranch(fromBranch, toBranch); + InternalRow outputRow = newInternalRow(true); + return new InternalRow[] {outputRow}; + }); + return result; + } + + public static ProcedureBuilder builder() { + return new BaseProcedure.Builder<RenameBranchProcedure>() { + @Override + public RenameBranchProcedure doBuild() { + return new RenameBranchProcedure(tableCatalog()); + } + }; + } + + @Override + public String description() { + return "RenameBranchProcedure"; + } +} diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/BranchProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/BranchProcedureTest.scala index 67786a47fe..0db7436019 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/BranchProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/BranchProcedureTest.scala @@ -181,4 +181,54 @@ class BranchProcedureTest extends PaimonSparkTestBase with StreamTest { Seq(Row("20240725", "apple", 5), Row("20240725", "banana", 7))) } } + + test("Paimon Procedure: rename branch") { + withTable("T") { + sql("CREATE TABLE T (a INT, b STRING) TBLPROPERTIES ('primary-key'='a', 'bucket'='3')") + + sql("INSERT INTO T VALUES (1, 'a'), (2, 'b')") + + // create tag + sql("CALL sys.create_tag(table => 'test.T', tag => 'tag1', snapshot => 1)") + + // create branch from tag + sql("CALL sys.create_branch(table => 'test.T', branch => 'branch1', tag => 'tag1')") + + // verify branch exists + val table = loadTable("T") + assert(table.branchManager().branchExists("branch1")) + + // rename branch + checkAnswer( + sql( + "CALL sys.rename_branch(table => 'test.T', from_branch => 'branch1', to_branch => 'branch2')"), + Row(true) :: Nil + ) + + // verify old branch does not exist + assert(!table.branchManager().branchExists("branch1")) + + // verify new branch exists + assert(table.branchManager().branchExists("branch2")) + + // verify data in renamed branch + checkAnswer( + sql("SELECT * FROM `T$branch_branch2` ORDER BY a"), + Row(1, "a") :: Row(2, "b") :: Nil + ) + + // rename non-existent branch should fail + intercept[Exception] { + sql( + "CALL sys.rename_branch(table => 'test.T', from_branch => 'nonexistent', to_branch => 'new_branch')") + } + + // rename to existing branch should fail + sql("CALL sys.create_branch(table => 'test.T', branch => 'branch3')") + intercept[Exception] { + sql( + "CALL sys.rename_branch(table => 'test.T', from_branch => 'branch2', to_branch => 'branch3')") + } + } + } }
