This is an automated email from the ASF dual-hosted git repository.
zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 394ee4f8c [core][branch] Add procedure in flink for create/delete
branch (#2725)
394ee4f8c is described below
commit 394ee4f8c46272a267df57b7384e665850aa1d16
Author: Fang Yong <[email protected]>
AuthorDate: Fri Jan 19 10:55:13 2024 +0800
[core][branch] Add procedure in flink for create/delete branch (#2725)
* [core][branch] Add procedure in flink for create/delete branch
---
.../flink/procedure/CreateBranchProcedure.java | 55 +++++++++++++++
.../flink/procedure/DeleteBranchProcedure.java | 50 +++++++++++++
.../services/org.apache.paimon.factories.Factory | 2 +
.../paimon/flink/action/BranchActionITCase.java | 81 ++++++++++++++++++++++
4 files changed, 188 insertions(+)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
new file mode 100644
index 000000000..b870f088b
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Create branch procedure for given tag. Usage:
+ *
+ * <pre><code>
+ * CALL sys.create_branch('tableId', 'branchName', 'tagName')
+ * </code></pre>
+ */
+public class CreateBranchProcedure extends ProcedureBase {
+
+ public static final String IDENTIFIER = "create_branch";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext, String tableId, String
branchName, String tagName)
+ throws Catalog.TableNotExistException {
+ return innerCall(tableId, branchName, tagName);
+ }
+
+ private String[] innerCall(String tableId, String branchName, String
tagName)
+ throws Catalog.TableNotExistException {
+ Table table = catalog.getTable(Identifier.fromString(tableId));
+ table.createBranch(branchName, tagName);
+ return new String[] {"Success"};
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java
new file mode 100644
index 000000000..c4d04b5a1
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Delete branch procedure. Usage:
+ *
+ * <pre><code>
+ * CALL sys.delete_branch('tableId', 'branchName')
+ * </code></pre>
+ */
+public class DeleteBranchProcedure extends ProcedureBase {
+
+ public static final String IDENTIFIER = "delete_branch";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ public String[] call(ProcedureContext procedureContext, String tableId,
String branchName)
+ throws Catalog.TableNotExistException {
+ Table table = catalog.getTable(Identifier.fromString(tableId));
+ table.deleteBranch(branchName);
+
+ return new String[] {"Success"};
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 51c0f1ad1..634bd2768 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -32,6 +32,8 @@ org.apache.paimon.flink.procedure.CompactDatabaseProcedure
org.apache.paimon.flink.procedure.CompactProcedure
org.apache.paimon.flink.procedure.CreateTagProcedure
org.apache.paimon.flink.procedure.DeleteTagProcedure
+org.apache.paimon.flink.procedure.CreateBranchProcedure
+org.apache.paimon.flink.procedure.DeleteBranchProcedure
org.apache.paimon.flink.procedure.DropPartitionProcedure
org.apache.paimon.flink.procedure.MergeIntoProcedure
org.apache.paimon.flink.procedure.ResetConsumerProcedure
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
new file mode 100644
index 000000000..8d445ab95
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.TagManager;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for branch management actions. */
+class BranchActionITCase extends ActionITCaseBase {
+ @Test
+ void testCreateAndDeleteBranch() throws Exception {
+
+ init(warehouse);
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.BIGINT(),
DataTypes.STRING()},
+ new String[] {"k", "v"});
+ FileStoreTable table =
+ createFileStoreTable(
+ rowType,
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ Collections.emptyMap());
+
+ StreamWriteBuilder writeBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = writeBuilder.newWrite();
+ commit = writeBuilder.newCommit();
+
+ // 3 snapshots
+ writeData(rowData(1L, BinaryString.fromString("Hi")));
+ writeData(rowData(2L, BinaryString.fromString("Hello")));
+ writeData(rowData(3L, BinaryString.fromString("Paimon")));
+
+ TagManager tagManager = new TagManager(table.fileIO(),
table.location());
+ callProcedure(
+ String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)",
database, tableName));
+ assertThat(tagManager.tagExists("tag2")).isTrue();
+
+ BranchManager branchManager = table.branchManager();
+ callProcedure(
+ String.format(
+ "CALL sys.create_branch('%s.%s', 'branch_name',
'tag2')",
+ database, tableName));
+ assertThat(branchManager.branchExists("branch_name")).isTrue();
+
+ callProcedure(
+ String.format(
+ "CALL sys.delete_branch('%s.%s', 'branch_name')",
database, tableName));
+ assertThat(branchManager.branchExists("branch_name")).isFalse();
+ }
+}