This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 92ab2ed73 [flink] Transform actions to procedures (#2027)
92ab2ed73 is described below

commit 92ab2ed73173221ce4be1ebafb15213de59e5474
Author: yuzelin <[email protected]>
AuthorDate: Wed Sep 20 21:11:41 2023 +0800

    [flink] Transform actions to procedures (#2027)
---
 .../flink/action/MergeIntoActionFactory.java       |  18 +-
 .../paimon/flink/action/TableActionBase.java       |   4 -
 .../flink/procedure/CompactDatabaseProcedure.java  | 121 +++++++++++++
 .../paimon/flink/procedure/CompactProcedure.java   |  23 +--
 .../paimon/flink/procedure/CreateTagProcedure.java |  50 ++++++
 .../paimon/flink/procedure/DeleteTagProcedure.java |  49 ++++++
 .../flink/procedure/DropPartitionProcedure.java    |  61 +++++++
 .../paimon/flink/procedure/MergeIntoProcedure.java | 192 +++++++++++++++++++++
 .../paimon/flink/procedure/ProcedureBase.java      |  29 +++-
 .../paimon/flink/procedure/ProcedureUtil.java      |  26 ++-
 .../flink/procedure/ResetConsumerProcedure.java    |  58 +++++++
 .../flink/procedure/RollbackToProcedure.java       |  61 +++++++
 .../paimon/flink/action/ActionITCaseBase.java      |   8 +-
 .../flink/action/CompactDatabaseActionITCase.java  | 169 +++++++++---------
 .../paimon/flink/action/ConsumerActionITCase.java  |  14 +-
 .../flink/action/DropPartitionActionITCase.java    |  42 +++--
 .../paimon/flink/action/MergeIntoActionITCase.java |  90 +++++++++-
 .../flink/action/RollbackToActionITCase.java       |   9 +-
 .../paimon/flink/action/TagActionITCase.java       |  21 ++-
 19 files changed, 891 insertions(+), 154 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoActionFactory.java
index 5fae57222..27fe6069b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoActionFactory.java
@@ -33,6 +33,12 @@ public class MergeIntoActionFactory implements ActionFactory 
{
 
     public static final String IDENTIFIER = "merge-into";
 
+    public static final String MATCHED_UPSERT = "matched-upsert";
+    public static final String NOT_MATCHED_BY_SOURCE_UPSERT = 
"not-matched-by-source-upsert";
+    public static final String MATCHED_DELETE = "matched-delete";
+    public static final String NOT_MATCHED_BY_SOURCE_DELETE = 
"not-matched-by-source-delete";
+    public static final String NOT_MATCHED_INSERT = "not-matched-insert";
+
     @Override
     public String identifier() {
         return IDENTIFIER;
@@ -66,25 +72,25 @@ public class MergeIntoActionFactory implements 
ActionFactory {
                 Arrays.stream(params.get("merge-actions").split(","))
                         .map(String::trim)
                         .collect(Collectors.toList());
-        if (actions.contains("matched-upsert")) {
+        if (actions.contains(MATCHED_UPSERT)) {
             checkRequiredArgument(params, "matched-upsert-set");
             action.withMatchedUpsert(
                     params.get("matched-upsert-condition"), 
params.get("matched-upsert-set"));
         }
-        if (actions.contains("not-matched-by-source-upsert")) {
+        if (actions.contains(NOT_MATCHED_BY_SOURCE_UPSERT)) {
             checkRequiredArgument(params, "not-matched-by-source-upsert-set");
             action.withNotMatchedBySourceUpsert(
                     params.get("not-matched-by-source-upsert-condition"),
                     params.get("not-matched-by-source-upsert-set"));
         }
-        if (actions.contains("matched-delete")) {
+        if (actions.contains(MATCHED_DELETE)) {
             action.withMatchedDelete(params.get("matched-delete-condition"));
         }
-        if (actions.contains("not-matched-by-source-delete")) {
+        if (actions.contains(NOT_MATCHED_BY_SOURCE_DELETE)) {
             action.withNotMatchedBySourceDelete(
                     params.get("not-matched-by-source-delete-condition"));
         }
-        if (actions.contains("not-matched-insert")) {
+        if (actions.contains(NOT_MATCHED_INSERT)) {
             checkRequiredArgument(params, "not-matched-insert-values");
             action.withNotMatchedInsert(
                     params.get("not-matched-insert-condition"),
@@ -182,7 +188,7 @@ public class MergeIntoActionFactory implements 
ActionFactory {
                 "  It will find matched rows of target table that meet 
condition (T.k = S.k), then update T.v with S.v where (T.v <> S.v).");
     }
 
-    private void validate(MergeIntoAction action) {
+    public static void validate(MergeIntoAction action) {
         if (!action.matchedUpsert
                 && !action.notMatchedUpsert
                 && !action.matchedDelete
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
index 02b8e1943..5095cc352 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
@@ -30,8 +30,6 @@ import org.apache.paimon.utils.Preconditions;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.table.data.RowData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -41,8 +39,6 @@ import java.util.Map;
 /** Abstract base of {@link Action} for table. */
 public abstract class TableActionBase extends ActionBase {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(TableActionBase.class);
-
     protected Table table;
     protected final Identifier identifier;
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
new file mode 100644
index 000000000..10b06931a
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.AbstractCatalog;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.flink.action.CompactDatabaseAction;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.Map;
+
+import static 
org.apache.paimon.flink.action.ActionFactory.parseCommaSeparatedKeyValues;
+
+/**
+ * Compact database procedure. Usage:
+ *
+ * <pre><code>
+ *  -- NOTE: use '' as placeholder for optional arguments
+ *
+ *  -- compact all databases
+ *  CALL compact_database()
+ *
+ *  -- compact some databases (accept regular expression)
+ *  CALL compact_database('includingDatabases')
+ *
+ *  -- set compact mode
+ *  CALL compact_database('includingDatabases', 'mode')
+ *
+ *  -- compact some tables (accept regular expression)
+ *  CALL compact_database('includingDatabases', 'mode', 'includingTables')
+ *
+ *  -- exclude some tables (accept regular expression)
+ *  CALL compact_database('includingDatabases', 'mode', 'includingTables', 
'excludingTables')
+ *
+ *  -- set table options ('k=v,...')
+ *  CALL compact_database('includingDatabases', 'mode', 'includingTables', 
'excludingTables', 'tableOptions')
+ * </code></pre>
+ */
+public class CompactDatabaseProcedure extends ProcedureBase {
+
+    public static final String NAME = "compact_database";
+
+    public CompactDatabaseProcedure(Catalog catalog) {
+        super(catalog);
+    }
+
+    public String[] call(ProcedureContext procedureContext) throws Exception {
+        return call(procedureContext, "");
+    }
+
+    public String[] call(ProcedureContext procedureContext, String 
includingDatabases)
+            throws Exception {
+        return call(procedureContext, includingDatabases, "");
+    }
+
+    public String[] call(ProcedureContext procedureContext, String 
includingDatabases, String mode)
+            throws Exception {
+        return call(procedureContext, includingDatabases, mode, "");
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext,
+            String includingDatabases,
+            String mode,
+            String includingTables)
+            throws Exception {
+        return call(procedureContext, includingDatabases, mode, 
includingTables, "");
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext,
+            String includingDatabases,
+            String mode,
+            String includingTables,
+            String excludingTables)
+            throws Exception {
+        return call(
+                procedureContext, includingDatabases, mode, includingTables, 
excludingTables, "");
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext,
+            String includingDatabases,
+            String mode,
+            String includingTables,
+            String excludingTables,
+            String tableOptions)
+            throws Exception {
+        String warehouse = ((AbstractCatalog) catalog).warehouse();
+        Map<String, String> catalogOptions = ((AbstractCatalog) 
catalog).options();
+        CompactDatabaseAction action =
+                new CompactDatabaseAction(warehouse, catalogOptions)
+                        .includingDatabases(nullable(includingDatabases))
+                        .includingTables(nullable(includingTables))
+                        .excludingTables(nullable(excludingTables))
+                        .withDatabaseCompactMode(nullable(mode));
+        if (!StringUtils.isBlank(tableOptions)) {
+            
action.withTableOptions(parseCommaSeparatedKeyValues(tableOptions));
+        }
+
+        return execute(procedureContext, action, "Compact database job");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
index cb8aa3162..e4970e6f8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
@@ -24,32 +24,30 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.CompactAction;
 import org.apache.paimon.flink.action.SortCompactAction;
 
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.procedure.ProcedureContext;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 
-import static 
org.apache.paimon.flink.action.ActionFactory.parseCommaSeparatedKeyValues;
-
 /**
  * Compact procedure. Usage:
  *
  * <pre><code>
+ *  -- NOTE: use '' as placeholder for optional arguments
+ *
  *  -- compact a table (tableId should be 'database_name.table_name')
  *  CALL compact('tableId')
  *
  *  -- compact a table with sorting
- *  CALL compact('tableId', 'order-strategy', 'order-by-columns')
+ *  CALL compact('tableId', 'orderStrategy', 'orderByColumns')
  *
  *  -- compact specific partitions ('pt1=A,pt2=a', 'pt1=B,pt2=b', ...)
- *  -- NOTE: if you don't need sorting but you want specify partitions, use '' 
as placeholder
  *  CALL compact('tableId', '', '', partition1, partition2, ...)
  * </code></pre>
  */
 public class CompactProcedure extends ProcedureBase {
 
+    public static final String NAME = "compact";
+
     public CompactProcedure(Catalog catalog) {
         super(catalog);
     }
@@ -103,16 +101,9 @@ public class CompactProcedure extends ProcedureBase {
         }
 
         if (partitionStrings.length != 0) {
-            List<Map<String, String>> partitions = new ArrayList<>();
-            for (String partition : partitionStrings) {
-                partitions.add(parseCommaSeparatedKeyValues(partition));
-            }
-            action.withPartitions(partitions);
+            action.withPartitions(getPartitions(partitionStrings));
         }
 
-        StreamExecutionEnvironment env = 
procedureContext.getExecutionEnvironment();
-        action.build(env);
-
-        return execute(env, jobName);
+        return execute(procedureContext, action, jobName);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
new file mode 100644
index 000000000..0a672046d
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Create tag procedure. Usage:
+ *
+ * <pre><code>
+ *  CALL create_tag('tableId', 'tagName', snapshotId)
+ * </code></pre>
+ */
+public class CreateTagProcedure extends ProcedureBase {
+
+    public static final String NAME = "create_tag";
+
+    public CreateTagProcedure(Catalog catalog) {
+        super(catalog);
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext, String tableId, String tagName, 
long snapshotId)
+            throws Catalog.TableNotExistException {
+        Table table = catalog.getTable(Identifier.fromString(tableId));
+        table.createTag(tagName, snapshotId);
+
+        return new String[] {"Success"};
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java
new file mode 100644
index 000000000..83a99e7a3
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Delete tag procedure. Usage:
+ *
+ * <pre><code>
+ *  CALL delete_tag('tableId', 'tagName')
+ * </code></pre>
+ */
+public class DeleteTagProcedure extends ProcedureBase {
+
+    public static final String NAME = "delete_tag";
+
+    public DeleteTagProcedure(Catalog catalog) {
+        super(catalog);
+    }
+
+    public String[] call(ProcedureContext procedureContext, String tableId, 
String tagName)
+            throws Catalog.TableNotExistException {
+        Table table = catalog.getTable(Identifier.fromString(tableId));
+        table.deleteTag(tagName);
+
+        return new String[] {"Success"};
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
new file mode 100644
index 000000000..26d8f2919
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.operation.FileStoreCommit;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.UUID;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * Drop partition procedure. Usage:
+ *
+ * <pre><code>
+ *  CALL drop_partition('tableId', 'partition1', 'partition2', ...)
+ * </code></pre>
+ */
+public class DropPartitionProcedure extends ProcedureBase {
+
+    public static final String NAME = "drop_partition";
+
+    public DropPartitionProcedure(Catalog catalog) {
+        super(catalog);
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext, String tableId, String... 
partitionStrings)
+            throws Catalog.TableNotExistException {
+        checkArgument(
+                partitionStrings.length > 0, "drop-partition procedure must 
specify partitions.");
+
+        FileStoreTable fileStoreTable =
+                (FileStoreTable) 
catalog.getTable(Identifier.fromString(tableId));
+        FileStoreCommit commit = 
fileStoreTable.store().newCommit(UUID.randomUUID().toString());
+        commit.dropPartitions(getPartitions(partitionStrings), 
BatchWriteBuilder.COMMIT_IDENTIFIER);
+
+        return new String[] {"Success"};
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
new file mode 100644
index 000000000..6563595e6
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.AbstractCatalog;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.MergeIntoAction;
+import org.apache.paimon.flink.action.MergeIntoActionFactory;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.paimon.flink.action.MergeIntoActionFactory.MATCHED_DELETE;
+import static 
org.apache.paimon.flink.action.MergeIntoActionFactory.MATCHED_UPSERT;
+import static 
org.apache.paimon.flink.action.MergeIntoActionFactory.NOT_MATCHED_BY_SOURCE_DELETE;
+import static 
org.apache.paimon.flink.action.MergeIntoActionFactory.NOT_MATCHED_BY_SOURCE_UPSERT;
+import static 
org.apache.paimon.flink.action.MergeIntoActionFactory.NOT_MATCHED_INSERT;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/**
+ * Merge Into procedure. Usage:
+ *
+ * <pre><code>
+ *  CALL merge_into(
+ *      'targetTableId',
+ *      'targetAlias',
+ *      'sourceSqls', -- separate with ';'
+ *      'sourceTable',
+ *      'mergeCondition',
+ *      'mergeActions',
+ *      // arguments for merge actions
+ *      ...
+ *      )
+ *
+ *  -- merge actions and corresponding arguments
+ *  matched-upsert: condition, set
+ *  not-matched-by-source-upsert: condition, set
+ *  matched-delete: condition
+ *  not-matched-by-source-delete: condition
+ *  not-matched-insert: condition, insertValues
+ *
+ *  -- NOTE: the arguments should be in the order of merge actions
+ *  -- and use '' as placeholder for optional arguments
+ * </code></pre>
+ *
+ * <p>This procedure will be forced to use batch environments
+ */
+public class MergeIntoProcedure extends ProcedureBase {
+
+    public static final String NAME = "merge_into";
+
+    public MergeIntoProcedure(Catalog catalog) {
+        super(catalog);
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext,
+            String targetTableId,
+            String targetAlias,
+            String sourceSqls,
+            String sourceTable,
+            String mergeCondition,
+            String mergeActions,
+            String... mergeActionArguments)
+            throws Exception {
+        String warehouse = ((AbstractCatalog) catalog).warehouse();
+        Map<String, String> catalogOptions = ((AbstractCatalog) 
catalog).options();
+        Identifier identifier = Identifier.fromString(targetTableId);
+        MergeIntoAction action =
+                new MergeIntoAction(
+                        warehouse,
+                        identifier.getDatabaseName(),
+                        identifier.getObjectName(),
+                        catalogOptions);
+        action.withTargetAlias(nullable(targetAlias));
+
+        if (!sourceSqls.isEmpty()) {
+            action.withSourceSqls(sourceSqls.split(";"));
+        }
+
+        checkArgument(!sourceTable.isEmpty(), "Must specify source table.");
+        action.withSourceTable(sourceTable);
+
+        checkArgument(!mergeCondition.isEmpty(), "Must specify merge 
condition.");
+        action.withMergeCondition(mergeCondition);
+
+        checkArgument(!mergeActions.isEmpty(), "Must specify at least one 
merge action.");
+        List<String> actions =
+                Arrays.stream(mergeActions.split(","))
+                        .map(String::trim)
+                        .collect(Collectors.toList());
+        validateActions(actions, mergeActionArguments.length);
+
+        int index = 0;
+        String condition, setting;
+        for (String mergeAction : actions) {
+            switch (mergeAction) {
+                case MATCHED_UPSERT:
+                case NOT_MATCHED_BY_SOURCE_UPSERT:
+                case NOT_MATCHED_INSERT:
+                    condition = nullable(mergeActionArguments[index++]);
+                    setting = nullable(mergeActionArguments[index++]);
+                    checkNotNull(setting, "%s must set the second argument", 
mergeAction);
+                    setMergeAction(action, mergeAction, condition, setting);
+                    break;
+                case MATCHED_DELETE:
+                case NOT_MATCHED_BY_SOURCE_DELETE:
+                    condition = nullable(mergeActionArguments[index++]);
+                    setMergeAction(action, mergeAction, condition);
+                    break;
+                default:
+                    throw new UnsupportedOperationException("Unknown merge 
action: " + action);
+            }
+        }
+
+        MergeIntoActionFactory.validate(action);
+
+        // TODO set dml-sync argument to action
+        action.run();
+
+        return new String[] {"Success"};
+    }
+
+    private void validateActions(List<String> mergeActions, int 
argumentLength) {
+        int expectedArguments = 0;
+        for (String action : mergeActions) {
+            switch (action) {
+                case MATCHED_UPSERT:
+                case NOT_MATCHED_BY_SOURCE_UPSERT:
+                case NOT_MATCHED_INSERT:
+                    expectedArguments += 2;
+                    break;
+                case MATCHED_DELETE:
+                case NOT_MATCHED_BY_SOURCE_DELETE:
+                    expectedArguments += 1;
+                    break;
+                default:
+                    throw new UnsupportedOperationException("Unknown merge 
action: " + action);
+            }
+        }
+
+        checkArgument(
+                expectedArguments == argumentLength,
+                "Expected %s action arguments but given '%s'",
+                expectedArguments,
+                argumentLength);
+    }
+
+    private void setMergeAction(MergeIntoAction action, String mergeAction, 
String... arguments) {
+        switch (mergeAction) {
+            case MATCHED_UPSERT:
+                action.withMatchedUpsert(arguments[0], arguments[1]);
+                return;
+            case NOT_MATCHED_BY_SOURCE_UPSERT:
+                action.withNotMatchedBySourceUpsert(arguments[0], 
arguments[1]);
+                return;
+            case NOT_MATCHED_INSERT:
+                action.withNotMatchedInsert(arguments[0], arguments[1]);
+                return;
+            case MATCHED_DELETE:
+                action.withMatchedDelete(arguments[0]);
+                return;
+            case NOT_MATCHED_BY_SOURCE_DELETE:
+                action.withNotMatchedBySourceDelete(arguments[0]);
+                return;
+            default:
+                throw new UnsupportedOperationException("Unknown merge action: 
" + mergeAction);
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
index c44fe2701..1faa6445e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
@@ -19,16 +19,26 @@
 package org.apache.paimon.flink.procedure;
 
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
+import org.apache.paimon.utils.StringUtils;
 
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.procedure.ProcedureContext;
 import org.apache.flink.table.procedures.Procedure;
 
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 import static 
org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
+import static 
org.apache.paimon.flink.action.ActionFactory.parseCommaSeparatedKeyValues;
 
 /** Base implementation for flink {@link Procedure}. */
 public class ProcedureBase implements Procedure {
@@ -39,8 +49,25 @@ public class ProcedureBase implements Procedure {
         this.catalog = catalog;
     }
 
-    protected String[] execute(StreamExecutionEnvironment env, String 
defaultJobName)
+    protected List<Map<String, String>> getPartitions(String... 
partitionStrings) {
+        List<Map<String, String>> partitions = new ArrayList<>();
+        for (String partition : partitionStrings) {
+            partitions.add(parseCommaSeparatedKeyValues(partition));
+        }
+        return partitions;
+    }
+
+    @Nullable
+    protected String nullable(String arg) {
+        return StringUtils.isBlank(arg) ? null : arg;
+    }
+
+    protected String[] execute(
+            ProcedureContext procedureContext, Action action, String 
defaultJobName)
             throws Exception {
+        StreamExecutionEnvironment env = 
procedureContext.getExecutionEnvironment();
+        action.build(env);
+
         ReadableConfig conf = 
StreamExecutionEnvironmentUtils.getConfiguration(env);
         String name = 
conf.getOptional(PipelineOptions.NAME).orElse(defaultJobName);
         JobClient jobClient = env.executeAsync(name);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
index e90450afc..98fbcda5f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.procedure;
 
 import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.flink.action.CompactActionFactory;
 
 import org.apache.flink.table.procedures.Procedure;
 
@@ -36,7 +35,14 @@ public class ProcedureUtil {
     private static final List<String> SYSTEM_PROCEDURES = new ArrayList<>();
 
     static {
-        SYSTEM_PROCEDURES.add(CompactActionFactory.IDENTIFIER);
+        SYSTEM_PROCEDURES.add(CompactProcedure.NAME);
+        SYSTEM_PROCEDURES.add(CompactDatabaseProcedure.NAME);
+        SYSTEM_PROCEDURES.add(CreateTagProcedure.NAME);
+        SYSTEM_PROCEDURES.add(DeleteTagProcedure.NAME);
+        SYSTEM_PROCEDURES.add(DropPartitionProcedure.NAME);
+        SYSTEM_PROCEDURES.add(MergeIntoProcedure.NAME);
+        SYSTEM_PROCEDURES.add(ResetConsumerProcedure.NAME);
+        SYSTEM_PROCEDURES.add(RollbackToProcedure.NAME);
     }
 
     public static List<String> listProcedures() {
@@ -45,8 +51,22 @@ public class ProcedureUtil {
 
     public static Optional<Procedure> getProcedure(Catalog catalog, String 
procedureName) {
         switch (procedureName) {
-            case CompactActionFactory.IDENTIFIER:
+            case CompactProcedure.NAME:
                 return Optional.of(new CompactProcedure(catalog));
+            case CompactDatabaseProcedure.NAME:
+                return Optional.of(new CompactDatabaseProcedure(catalog));
+            case CreateTagProcedure.NAME:
+                return Optional.of(new CreateTagProcedure(catalog));
+            case DeleteTagProcedure.NAME:
+                return Optional.of(new DeleteTagProcedure(catalog));
+            case DropPartitionProcedure.NAME:
+                return Optional.of(new DropPartitionProcedure(catalog));
+            case MergeIntoProcedure.NAME:
+                return Optional.of(new MergeIntoProcedure(catalog));
+            case ResetConsumerProcedure.NAME:
+                return Optional.of(new ResetConsumerProcedure(catalog));
+            case RollbackToProcedure.NAME:
+                return Optional.of(new RollbackToProcedure(catalog));
             default:
                 return Optional.empty();
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
new file mode 100644
index 000000000..70a409b0b
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.consumer.Consumer;
+import org.apache.paimon.consumer.ConsumerManager;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Drop partition procedure. Usage:
+ *
+ * <pre><code>
+ *  CALL reset_consumer('tableId', 'consumerId', nextSnapshotId)
+ * </code></pre>
+ */
+public class ResetConsumerProcedure extends ProcedureBase {
+
+    public static final String NAME = "reset_consumer";
+
+    public ResetConsumerProcedure(Catalog catalog) {
+        super(catalog);
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext,
+            String tableId,
+            String consumerId,
+            long nextSnapshotId)
+            throws Catalog.TableNotExistException {
+        FileStoreTable fileStoreTable =
+                (FileStoreTable) 
catalog.getTable(Identifier.fromString(tableId));
+        ConsumerManager consumerManager =
+                new ConsumerManager(fileStoreTable.fileIO(), 
fileStoreTable.location());
+        consumerManager.resetConsumer(consumerId, new 
Consumer(nextSnapshotId));
+
+        return new String[] {"Success"};
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
new file mode 100644
index 000000000..b8d6dd38c
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Drop partition procedure. Usage:
+ *
+ * <pre><code>
+ *  -- rollback to a snapshot
+ *  CALL rollback_to('tableId', snapshotId)
+ *
+ *  -- rollback to a tag
+ *  CALL rollback_to('tableId', 'tagName')
+ * </code></pre>
+ */
+public class RollbackToProcedure extends ProcedureBase {
+
+    public static final String NAME = "rollback_to";
+
+    public RollbackToProcedure(Catalog catalog) {
+        super(catalog);
+    }
+
+    public String[] call(ProcedureContext procedureContext, String tableId, 
long snapshotId)
+            throws Catalog.TableNotExistException {
+        Table table = catalog.getTable(Identifier.fromString(tableId));
+        table.rollbackTo(snapshotId);
+
+        return new String[] {"Success"};
+    }
+
+    public String[] call(ProcedureContext procedureContext, String tableId, 
String tagName)
+            throws Catalog.TableNotExistException {
+        Table table = catalog.getTable(Identifier.fromString(tableId));
+        table.rollbackTo(tagName);
+
+        return new String[] {"Success"};
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
index 0e71d076f..fe48881d4 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
@@ -54,6 +54,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
 
 /** {@link Action} test base. */
 public abstract class ActionITCaseBase extends AbstractTestBase {
@@ -145,7 +146,7 @@ public abstract class ActionITCaseBase extends 
AbstractTestBase {
     protected StreamExecutionEnvironment buildDefaultEnv(boolean isStreaming) {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-        env.setParallelism(2);
+        env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
 
         if (isStreaming) {
             env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
@@ -158,6 +159,11 @@ public abstract class ActionITCaseBase extends 
AbstractTestBase {
         return env;
     }
 
+    protected void callProcedure(String procedureStatement) {
+        // default execution mode
+        callProcedure(procedureStatement, true, false);
+    }
+
     protected void callProcedure(String procedureStatement, boolean 
isStreaming, boolean dmlSync) {
         StreamExecutionEnvironment env = buildDefaultEnv(isStreaming);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
index ce2e72641..492d98434 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
@@ -38,15 +38,14 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.CommonTestUtils;
 import org.apache.paimon.utils.SnapshotManager;
 
-import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import javax.annotation.Nullable;
+
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -95,8 +94,6 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
         Map<String, String> options = new HashMap<>();
         options.put(CoreOptions.WRITE_ONLY.key(), "true");
 
-        Map<String, String> tableOptions = new HashMap<>();
-
         List<FileStoreTable> tables = new ArrayList<>();
 
         for (String dbName : DATABASE_NAMES) {
@@ -131,27 +128,16 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
             }
         }
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
-        env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
-        if (mode.equals("divided")) {
-            new CompactDatabaseAction(warehouse, new HashMap<>())
-                    .includingDatabases(null)
-                    .includingTables(null)
-                    .excludingTables(null)
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            StreamExecutionEnvironment env = buildDefaultEnv(false);
+            new CompactDatabaseAction(warehouse, Collections.emptyMap())
+                    .withDatabaseCompactMode(mode)
                     .build(env);
+            env.execute();
         } else {
-            new CompactDatabaseAction(warehouse, new HashMap<>())
-                    .includingDatabases(null)
-                    .includingTables(null)
-                    .excludingTables(null)
-                    .withDatabaseCompactMode("combined")
-                    .withTableOptions(tableOptions)
-                    .build(env);
+            callProcedure(String.format("CALL compact_database('', '%s')", 
mode), false, true);
         }
 
-        env.execute();
-
         for (FileStoreTable table : tables) {
             SnapshotManager snapshotManager = table.snapshotManager();
             Snapshot snapshot =
@@ -181,11 +167,6 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
         options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "3");
         options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "3");
 
-        Map<String, String> tableOptions = new HashMap<>();
-        // if CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key() use default 
value, the cost time in
-        // combined mode will be over 1mins
-        tableOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), 
"1s");
-
         List<FileStoreTable> tables = new ArrayList<>();
         for (String dbName : DATABASE_NAMES) {
             for (String tableName : TABLE_NAMES) {
@@ -220,27 +201,31 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
             }
         }
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
-        
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
-        env.getCheckpointConfig().setCheckpointInterval(500);
-        env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
-        if (mode.equals("divided")) {
-            new CompactDatabaseAction(warehouse, new HashMap<>())
-                    .includingDatabases(null)
-                    .includingTables(null)
-                    .excludingTables(null)
-                    .build(env);
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            StreamExecutionEnvironment env = buildDefaultEnv(true);
+            if (mode.equals("divided")) {
+                new CompactDatabaseAction(warehouse, new 
HashMap<>()).build(env);
+            } else {
+                // if CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key() use 
default value, the cost
+                // time in combined mode will be over 1 min
+                new CompactDatabaseAction(warehouse, new HashMap<>())
+                        .withDatabaseCompactMode("combined")
+                        .withTableOptions(
+                                Collections.singletonMap(
+                                        
CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s"))
+                        .build(env);
+            }
+            env.executeAsync();
         } else {
-            new CompactDatabaseAction(warehouse, new HashMap<>())
-                    .includingDatabases(null)
-                    .includingTables(null)
-                    .excludingTables(null)
-                    .withDatabaseCompactMode("combined")
-                    .withTableOptions(tableOptions)
-                    .build(env);
+            if (mode.equals("divided")) {
+                callProcedure("CALL compact_database()", true, false);
+            } else {
+                callProcedure(
+                        "CALL compact_database('', 'combined', '', '', 
'continuous.discovery-interval=1s')",
+                        true,
+                        false);
+            }
         }
-        JobClient client = env.executeAsync();
 
         for (FileStoreTable table : tables) {
             StreamTableScan scan = table.newReadBuilder().newStreamScan();
@@ -381,8 +366,6 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
                                 "Cannot validate snapshot expiration in %s 
milliseconds.", 60_000));
             }
         }
-
-        client.cancel();
     }
 
     @ParameterizedTest(name = "mode = {0}")
@@ -437,9 +420,6 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
         Map<String, String> options = new HashMap<>();
         options.put(CoreOptions.WRITE_ONLY.key(), "true");
 
-        Map<String, String> tableOptions = new HashMap<>();
-        tableOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), 
"1s");
-
         List<FileStoreTable> compactionTables = new ArrayList<>();
         List<FileStoreTable> noCompactionTables = new ArrayList<>();
 
@@ -480,26 +460,37 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
             }
         }
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
-        env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
-
-        if (mode.equals("divided")) {
-            new CompactDatabaseAction(warehouse, new HashMap<>())
-                    .includingDatabases(null)
-                    .includingTables(includingPattern)
-                    .excludingTables(excludesPattern)
-                    .build(env);
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            StreamExecutionEnvironment env = buildDefaultEnv(false);
+            CompactDatabaseAction action =
+                    new CompactDatabaseAction(warehouse, 
Collections.emptyMap())
+                            .includingTables(includingPattern)
+                            .excludingTables(excludesPattern)
+                            .withDatabaseCompactMode(mode);
+            if (mode.equals("combined")) {
+                action.withTableOptions(
+                        Collections.singletonMap(
+                                
CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s"));
+            }
+            action.build(env);
+            env.execute();
         } else {
-            new CompactDatabaseAction(warehouse, new HashMap<>())
-                    .includingDatabases(null)
-                    .includingTables(includingPattern)
-                    .excludingTables(excludesPattern)
-                    .withDatabaseCompactMode("combined")
-                    .withTableOptions(tableOptions)
-                    .build(env);
+            if (mode.equals("divided")) {
+                callProcedure(
+                        String.format(
+                                "CALL compact_database('', 'divided', '%s', 
'%s')",
+                                nonNull(includingPattern), 
nonNull(excludesPattern)),
+                        false,
+                        true);
+            } else {
+                callProcedure(
+                        String.format(
+                                "CALL compact_database('', 'combined', '%s', 
'%s', 'continuous.discovery-interval=1s')",
+                                nonNull(includingPattern), 
nonNull(excludesPattern)),
+                        false,
+                        true);
+            }
         }
-        env.execute();
 
         for (FileStoreTable table : compactionTables) {
             SnapshotManager snapshotManager = table.snapshotManager();
@@ -532,6 +523,10 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
         }
     }
 
+    private String nonNull(@Nullable String s) {
+        return s == null ? "" : s;
+    }
+
     @Test
     public void testUnawareBucketStreamingCompact() throws Exception {
         Map<String, String> options = new HashMap<>();
@@ -573,17 +568,13 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
             
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
         }
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
-        
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
-        env.getCheckpointConfig().setCheckpointInterval(500);
-        env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
-        new CompactDatabaseAction(warehouse, new HashMap<>())
-                .includingDatabases(null)
-                .includingTables(null)
-                .excludingTables(null)
-                .build(env);
-        JobClient client = env.executeAsync();
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            StreamExecutionEnvironment env = buildDefaultEnv(true);
+            new CompactDatabaseAction(warehouse, new HashMap<>()).build(env);
+            env.executeAsync();
+        } else {
+            callProcedure("CALL compact_database()");
+        }
 
         for (FileStoreTable table : tables) {
             StreamWriteBuilder streamWriteBuilder =
@@ -602,8 +593,6 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
             // second compaction, snapshot will be 5
             checkFileAndRowSize(table, 5L, 30_000L, 1, 9);
         }
-
-        client.cancel().get();
     }
 
     @Test
@@ -646,15 +635,13 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
             
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
         }
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
-        env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
-        new CompactDatabaseAction(warehouse, new HashMap<>())
-                .includingDatabases(null)
-                .includingTables(null)
-                .excludingTables(null)
-                .build(env);
-        env.execute();
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            StreamExecutionEnvironment env = buildDefaultEnv(false);
+            new CompactDatabaseAction(warehouse, new HashMap<>()).build(env);
+            env.execute();
+        } else {
+            callProcedure("CALL compact_database()", false, true);
+        }
 
         for (FileStoreTable table : tables) {
             // first compaction, snapshot will be 3.
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
index e33b3f42d..444d2f71e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
@@ -32,6 +32,7 @@ import org.junit.jupiter.api.Test;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Optional;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
@@ -80,11 +81,14 @@ public class ConsumerActionITCase extends ActionITCaseBase {
         assertThat(consumer1.get().nextSnapshot()).isEqualTo(4);
 
         // reset consumer
-        ResetConsumerAction resetConsumerAction =
-                new ResetConsumerAction(
-                        warehouse, database, tableName, 
Collections.emptyMap(), "myid", 1);
-        resetConsumerAction.run();
-
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            new ResetConsumerAction(
+                            warehouse, database, tableName, 
Collections.emptyMap(), "myid", 1)
+                    .run();
+        } else {
+            callProcedure(
+                    String.format("CALL reset_consumer('%s.%s', 'myid', 1)", 
database, tableName));
+        }
         Optional<Consumer> consumer2 = consumerManager.consumer("myid");
         assertThat(consumer2).isPresent();
         assertThat(consumer2.get().nextSnapshot()).isEqualTo(1);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
index e477adfed..ca22dfcf4 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
@@ -37,6 +37,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -54,13 +55,19 @@ public class DropPartitionActionITCase extends 
ActionITCaseBase {
     public void testDropPartitionWithSinglePartitionKey(boolean hasPk) throws 
Exception {
         FileStoreTable table = prepareTable(hasPk);
 
-        new DropPartitionAction(
-                        warehouse,
-                        database,
-                        tableName,
-                        
Collections.singletonList(Collections.singletonMap("partKey0", "0")),
-                        Collections.emptyMap())
-                .run();
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            new DropPartitionAction(
+                            warehouse,
+                            database,
+                            tableName,
+                            
Collections.singletonList(Collections.singletonMap("partKey0", "0")),
+                            Collections.emptyMap())
+                    .run();
+        } else {
+            callProcedure(
+                    String.format(
+                            "CALL drop_partition('%s.%s', 'partKey0 = 0')", 
database, tableName));
+        }
 
         SnapshotManager snapshotManager = 
getFileStoreTable(tableName).snapshotManager();
         Snapshot snapshot = 
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
@@ -106,13 +113,20 @@ public class DropPartitionActionITCase extends 
ActionITCaseBase {
         partitions1.put("partKey0", "1");
         partitions1.put("partKey1", "0");
 
-        new DropPartitionAction(
-                        warehouse,
-                        database,
-                        tableName,
-                        Arrays.asList(partitions0, partitions1),
-                        Collections.emptyMap())
-                .run();
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            new DropPartitionAction(
+                            warehouse,
+                            database,
+                            tableName,
+                            Arrays.asList(partitions0, partitions1),
+                            Collections.emptyMap())
+                    .run();
+        } else {
+            callProcedure(
+                    String.format(
+                            "CALL drop_partition('%s.%s', 
'partKey0=0,partKey1=1', 'partKey0=1,partKey1=0')",
+                            database, tableName));
+        }
 
         SnapshotManager snapshotManager = 
getFileStoreTable(tableName).snapshotManager();
         Snapshot snapshot = 
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
index d562f5160..c3ef32450 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
@@ -36,9 +36,15 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
 import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
+import static 
org.apache.paimon.flink.action.MergeIntoActionFactory.MATCHED_DELETE;
+import static 
org.apache.paimon.flink.action.MergeIntoActionFactory.MATCHED_UPSERT;
+import static 
org.apache.paimon.flink.action.MergeIntoActionFactory.NOT_MATCHED_BY_SOURCE_DELETE;
+import static 
org.apache.paimon.flink.action.MergeIntoActionFactory.NOT_MATCHED_BY_SOURCE_UPSERT;
+import static 
org.apache.paimon.flink.action.MergeIntoActionFactory.NOT_MATCHED_INSERT;
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.bEnv;
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildDdl;
 import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildSimpleQuery;
@@ -110,8 +116,31 @@ public class MergeIntoActionITCase extends 
ActionITCaseBase {
                         "dt < '02-28'", "v = v || '_nmu', last_action = 
'not_matched_upsert'")
                 .withNotMatchedBySourceDelete("dt >= '02-28'");
 
+        String mergeActions =
+                String.format(
+                        "%s,%s,%s,%s,%s",
+                        MATCHED_UPSERT,
+                        MATCHED_DELETE,
+                        NOT_MATCHED_INSERT,
+                        NOT_MATCHED_BY_SOURCE_UPSERT,
+                        NOT_MATCHED_BY_SOURCE_DELETE);
+        String procedureStatement =
+                String.format(
+                        "CALL merge_into('%s.T', '', '', 'default.S', 'T.k = 
S.k AND T.dt = S.dt', '%s', %s)",
+                        database,
+                        mergeActions,
+                        "'T.v <> S.v AND S.v IS NOT NULL', "
+                                + "'v = S.v, last_action = 
''matched_upsert''', "
+                                + "'S.v IS NULL', "
+                                + "'', "
+                                + "'S.k, S.v, ''insert'', S.dt', "
+                                + "'dt < ''02-28''', "
+                                + "'v = v || ''_nmu'', last_action = 
''not_matched_upsert''', "
+                                + "'dt >= ''02-28'''");
+
         validateActionRunResult(
                 action,
+                procedureStatement,
                 expected,
                 Arrays.asList(
                         changelogRow("+I", 1, "v_1", "creation", "02-27"),
@@ -169,8 +198,14 @@ public class MergeIntoActionITCase extends 
ActionITCaseBase {
                 .withMergeCondition("TT.k = S.k AND TT.dt = S.dt")
                 .withMatchedDelete("S.v IS NULL");
 
+        String procedureStatement =
+                String.format(
+                        "CALL merge_into('%s.T', 'TT', '', 'S', 'TT.k = S.k 
AND TT.dt = S.dt', '%s', %s)",
+                        inDefault ? database : "test_db", MATCHED_DELETE, 
"'S.v IS NULL'");
+
         validateActionRunResult(
                 action,
+                procedureStatement,
                 Arrays.asList(
                         changelogRow("-D", 4, "v_4", "creation", "02-27"),
                         changelogRow("-D", 8, "v_8", "creation", "02-28")),
@@ -205,6 +240,11 @@ public class MergeIntoActionITCase extends 
ActionITCaseBase {
                 .withMergeCondition("T.k = S.k AND T.dt = S.dt")
                 .withMatchedDelete("S.v IS NULL");
 
+        String procedureStatement =
+                String.format(
+                        "CALL merge_into('default.T', '', '', '%s', 'T.k = S.k 
AND T.dt = S.dt', '%s', %s)",
+                        sourceTableName, MATCHED_DELETE, "'S.v IS NULL'");
+
         if (!inDefault) {
             sEnv.executeSql("USE `default`");
             bEnv.executeSql("USE `default`");
@@ -212,6 +252,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase 
{
 
         validateActionRunResult(
                 action,
+                procedureStatement,
                 Arrays.asList(
                         changelogRow("-D", 4, "v_4", "creation", "02-27"),
                         changelogRow("-D", 8, "v_8", "creation", "02-28")),
@@ -236,6 +277,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase 
{
                 String.format(
                         "CREATE CATALOG test_cat WITH ('type' = 'paimon', 
'warehouse' = '%s')",
                         getTempDirPath());
+        String escapeCatalog = catalog.replaceAll("'", "''");
         String id =
                 TestValuesTableFactory.registerData(
                         Arrays.asList(
@@ -245,8 +287,9 @@ public class MergeIntoActionITCase extends ActionITCaseBase 
{
         String ddl =
                 String.format(
                         "CREATE TEMPORARY TABLE %s (k INT, v STRING, dt 
STRING)\n"
-                                + "WITH ('connector' = 'values', 'bounded' = 
'true', 'data-id' = '%s');",
+                                + "WITH ('connector' = 'values', 'bounded' = 
'true', 'data-id' = '%s')",
                         useCatalog ? "S" : "test_cat.`default`.S", id);
+        String escapeDdl = ddl.replaceAll("'", "''");
 
         MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
 
@@ -261,8 +304,22 @@ public class MergeIntoActionITCase extends 
ActionITCaseBase {
 
         action.withMergeCondition("T.k = S.k AND T.dt = 
S.dt").withMatchedDelete("S.v IS NULL");
 
+        String procedureStatement =
+                String.format(
+                        "CALL merge_into('%s.T', '', '%s', '%s', 'T.k = S.k 
AND T.dt = S.dt', '%s', %s)",
+                        database,
+                        useCatalog
+                                ? String.format(
+                                        "%s;%s;%s",
+                                        escapeCatalog, "USE CATALOG test_cat", 
escapeDdl)
+                                : String.format("%s;%s", escapeCatalog, 
escapeDdl),
+                        useCatalog ? "S" : "test_cat.default.S",
+                        MATCHED_DELETE,
+                        "'S.v IS NULL'");
+
         validateActionRunResult(
                 action,
+                procedureStatement,
                 Arrays.asList(
                         changelogRow("-D", 4, "v_4", "creation", "02-27"),
                         changelogRow("-D", 8, "v_8", "creation", "02-28")),
@@ -287,8 +344,18 @@ public class MergeIntoActionITCase extends 
ActionITCaseBase {
                 .withMergeCondition("T.k = SS.k AND T.dt = SS.dt")
                 .withMatchedUpsert(null, "*");
 
+        String procedureStatement =
+                String.format(
+                        "CALL merge_into('%s.T', '', '%s', '%s', 'T.k = SS.k 
AND T.dt = SS.dt', '%s', %s)",
+                        database,
+                        "CREATE TEMPORARY VIEW SS AS SELECT k, v, ''unknown'', 
dt FROM S",
+                        qualified ? "default.SS" : "SS",
+                        MATCHED_UPSERT,
+                        "'', '*'");
+
         validateActionRunResult(
                 action,
+                procedureStatement,
                 Arrays.asList(
                         changelogRow("-U", 1, "v_1", "creation", "02-27"),
                         changelogRow("+U", 1, "v_1", "unknown", "02-27"),
@@ -321,8 +388,18 @@ public class MergeIntoActionITCase extends 
ActionITCaseBase {
                 .withMergeCondition("T.k = SS.k AND T.dt = SS.dt")
                 .withNotMatchedInsert("SS.k < 12", "*");
 
+        String procedureStatement =
+                String.format(
+                        "CALL merge_into('%s.T', '', '%s', '%s', 'T.k = SS.k 
AND T.dt = SS.dt', '%s', %s)",
+                        database,
+                        "CREATE TEMPORARY VIEW SS AS SELECT k, v, ''unknown'', 
dt FROM S",
+                        qualified ? "default.SS" : "SS",
+                        NOT_MATCHED_INSERT,
+                        "'SS.k < 12', '*'");
+
         validateActionRunResult(
                 action,
+                procedureStatement,
                 Arrays.asList(
                         changelogRow("+I", 8, "v_8", "unknown", "02-29"),
                         changelogRow("+I", 11, "v_11", "unknown", "02-29")),
@@ -416,11 +493,18 @@ public class MergeIntoActionITCase extends 
ActionITCaseBase {
     }
 
     private void validateActionRunResult(
-            MergeIntoAction action, List<Row> streamingExpected, List<Row> 
batchExpected)
+            MergeIntoAction action,
+            String procedureStatement,
+            List<Row> streamingExpected,
+            List<Row> batchExpected)
             throws Exception {
         BlockingIterator<Row, Row> iterator =
                 testStreamingRead(buildSimpleQuery("T"), initialRecords);
-        action.run();
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            action.run();
+        } else {
+            callProcedure(procedureStatement);
+        }
         // test streaming read
         validateStreamingReadResult(iterator, streamingExpected);
         iterator.close();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
index 97c8637a5..e6f6cea6b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
 import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.testBatchRead;
@@ -65,9 +66,11 @@ public class RollbackToActionITCase extends ActionITCaseBase 
{
         writeData(rowData(2L, BinaryString.fromString("World")));
         writeData(rowData(2L, BinaryString.fromString("Flink")));
 
-        RollbackToAction action =
-                new RollbackToAction(warehouse, database, tableName, "2", 
Collections.emptyMap());
-        action.run();
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            new RollbackToAction(warehouse, database, tableName, "2", 
Collections.emptyMap()).run();
+        } else {
+            callProcedure(String.format("CALL rollback_to('%s.%s', 2)", 
database, tableName));
+        }
 
         testBatchRead(
                 "SELECT * FROM `" + tableName + "`",
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
index d0f3e2879..4f44d672f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
 import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.testBatchRead;
@@ -65,10 +66,13 @@ public class TagActionITCase extends ActionITCaseBase {
 
         TagManager tagManager = new TagManager(table.fileIO(), 
table.location());
 
-        CreateTagAction createTagAction =
-                new CreateTagAction(
-                        warehouse, database, tableName, 
Collections.emptyMap(), "tag2", 2);
-        createTagAction.run();
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            new CreateTagAction(warehouse, database, tableName, 
Collections.emptyMap(), "tag2", 2)
+                    .run();
+        } else {
+            callProcedure(
+                    String.format("CALL create_tag('%s.%s', 'tag2', 2)", 
database, tableName));
+        }
         assertThat(tagManager.tagExists("tag2")).isTrue();
 
         // read tag2
@@ -76,9 +80,12 @@ public class TagActionITCase extends ActionITCaseBase {
                 "SELECT * FROM `" + tableName + "` /*+ 
OPTIONS('scan.tag-name'='tag2') */",
                 Arrays.asList(Row.of(1L, "Hi"), Row.of(2L, "Hello")));
 
-        DeleteTagAction deleteTagAction =
-                new DeleteTagAction(warehouse, database, tableName, 
Collections.emptyMap(), "tag2");
-        deleteTagAction.run();
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            new DeleteTagAction(warehouse, database, tableName, 
Collections.emptyMap(), "tag2")
+                    .run();
+        } else {
+            callProcedure(String.format("CALL delete_tag('%s.%s', 'tag2')", 
database, tableName));
+        }
         assertThat(tagManager.tagExists("tag2")).isFalse();
     }
 }

Reply via email to