This is an automated email from the ASF dual-hosted git repository.

zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 394ee4f8c [core][branch] Add procedure in flink for create/delete 
branch (#2725)
394ee4f8c is described below

commit 394ee4f8c46272a267df57b7384e665850aa1d16
Author: Fang Yong <[email protected]>
AuthorDate: Fri Jan 19 10:55:13 2024 +0800

    [core][branch] Add procedure in flink for create/delete branch (#2725)
    
    * [core][branch] Add procedure in flink for create/delete branch
---
 .../flink/procedure/CreateBranchProcedure.java     | 55 +++++++++++++++
 .../flink/procedure/DeleteBranchProcedure.java     | 50 +++++++++++++
 .../services/org.apache.paimon.factories.Factory   |  2 +
 .../paimon/flink/action/BranchActionITCase.java    | 81 ++++++++++++++++++++++
 4 files changed, 188 insertions(+)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
new file mode 100644
index 000000000..b870f088b
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Create branch procedure for given tag. Usage:
+ *
+ * <pre><code>
+ *  CALL sys.create_branch('tableId', 'branchName', 'tagName')
+ * </code></pre>
+ */
+public class CreateBranchProcedure extends ProcedureBase {
+
+    public static final String IDENTIFIER = "create_branch";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext, String tableId, String 
branchName, String tagName)
+            throws Catalog.TableNotExistException {
+        return innerCall(tableId, branchName, tagName);
+    }
+
+    private String[] innerCall(String tableId, String branchName, String 
tagName)
+            throws Catalog.TableNotExistException {
+        Table table = catalog.getTable(Identifier.fromString(tableId));
+        table.createBranch(branchName, tagName);
+        return new String[] {"Success"};
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java
new file mode 100644
index 000000000..c4d04b5a1
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Delete branch procedure. Usage:
+ *
+ * <pre><code>
+ *  CALL sys.delete_branch('tableId', 'branchName')
+ * </code></pre>
+ */
+public class DeleteBranchProcedure extends ProcedureBase {
+
+    public static final String IDENTIFIER = "delete_branch";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    public String[] call(ProcedureContext procedureContext, String tableId, 
String branchName)
+            throws Catalog.TableNotExistException {
+        Table table = catalog.getTable(Identifier.fromString(tableId));
+        table.deleteBranch(branchName);
+
+        return new String[] {"Success"};
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 51c0f1ad1..634bd2768 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -32,6 +32,8 @@ org.apache.paimon.flink.procedure.CompactDatabaseProcedure
 org.apache.paimon.flink.procedure.CompactProcedure
 org.apache.paimon.flink.procedure.CreateTagProcedure
 org.apache.paimon.flink.procedure.DeleteTagProcedure
+org.apache.paimon.flink.procedure.CreateBranchProcedure
+org.apache.paimon.flink.procedure.DeleteBranchProcedure
 org.apache.paimon.flink.procedure.DropPartitionProcedure
 org.apache.paimon.flink.procedure.MergeIntoProcedure
 org.apache.paimon.flink.procedure.ResetConsumerProcedure
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
new file mode 100644
index 000000000..8d445ab95
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.TagManager;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for branch management actions. */
+class BranchActionITCase extends ActionITCaseBase {
+    @Test
+    void testCreateAndDeleteBranch() throws Exception {
+
+        init(warehouse);
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.BIGINT(), 
DataTypes.STRING()},
+                        new String[] {"k", "v"});
+        FileStoreTable table =
+                createFileStoreTable(
+                        rowType,
+                        Collections.emptyList(),
+                        Collections.singletonList("k"),
+                        Collections.emptyMap());
+
+        StreamWriteBuilder writeBuilder = 
table.newStreamWriteBuilder().withCommitUser(commitUser);
+        write = writeBuilder.newWrite();
+        commit = writeBuilder.newCommit();
+
+        // 3 snapshots
+        writeData(rowData(1L, BinaryString.fromString("Hi")));
+        writeData(rowData(2L, BinaryString.fromString("Hello")));
+        writeData(rowData(3L, BinaryString.fromString("Paimon")));
+
+        TagManager tagManager = new TagManager(table.fileIO(), 
table.location());
+        callProcedure(
+                String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", 
database, tableName));
+        assertThat(tagManager.tagExists("tag2")).isTrue();
+
+        BranchManager branchManager = table.branchManager();
+        callProcedure(
+                String.format(
+                        "CALL sys.create_branch('%s.%s', 'branch_name', 
'tag2')",
+                        database, tableName));
+        assertThat(branchManager.branchExists("branch_name")).isTrue();
+
+        callProcedure(
+                String.format(
+                        "CALL sys.delete_branch('%s.%s', 'branch_name')", 
database, tableName));
+        assertThat(branchManager.branchExists("branch_name")).isFalse();
+    }
+}

Reply via email to