This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f31cd18f2 [flink] Support compact procedure (#2013)
f31cd18f2 is described below

commit f31cd18f2a679c0b002a22b7222c00a256f93173
Author: yuzelin <[email protected]>
AuthorDate: Tue Sep 19 10:11:14 2023 +0800

    [flink] Support compact procedure (#2013)
---
 .github/workflows/e2e-tests-1.16-jdk11.yml         |   4 +-
 .../org/apache/paimon/catalog/AbstractCatalog.java |   6 +-
 .../apache/paimon/catalog/FileSystemCatalog.java   |   2 +-
 paimon-flink/paimon-flink-1.18/pom.xml             |  85 +++++++++
 .../action/cdc/kafka/KafkaSyncDatabaseAction.java  |   5 -
 .../action/cdc/kafka/KafkaSyncTableAction.java     |   5 -
 .../cdc/mongodb/MongoDBSyncDatabaseAction.java     |   5 -
 .../action/cdc/mongodb/MongoDBSyncTableAction.java |   5 -
 .../action/cdc/mysql/MySqlSyncDatabaseAction.java  |   5 -
 .../action/cdc/mysql/MySqlSyncTableAction.java     |   5 -
 .../flink/action/cdc/CdcActionITCaseBase.java      |  21 ++-
 .../kafka/KafkaCanalSyncDatabaseActionITCase.java  |   3 +-
 .../cdc/kafka/KafkaCanalSyncTableActionITCase.java |   3 +-
 .../mongodb/MongoDBSyncDatabaseActionITCase.java   |   3 +-
 .../cdc/mongodb/MongoDBSyncTableActionITCase.java  |   3 +-
 .../cdc/mysql/MySqlSyncDatabaseActionITCase.java   |   3 +-
 .../cdc/mysql/MySqlSyncTableActionITCase.java      |   3 +-
 .../java/org/apache/paimon/flink/FlinkCatalog.java |   2 +-
 .../org/apache/paimon/flink/action/ActionBase.java |  11 +-
 .../paimon/flink/action/CompactActionFactory.java  |  23 +--
 .../paimon/flink/action/SortCompactAction.java     |  14 +-
 .../paimon/flink/action/TableActionBase.java       |   2 -
 .../paimon/flink/procedure/CompactProcedure.java   | 118 ++++++++++++
 .../paimon/flink/procedure/ProcedureBase.java      |  59 ++++++
 .../paimon/flink/procedure/ProcedureUtil.java      |  19 +-
 .../paimon/flink/action/ActionITCaseBase.java      |  65 ++++++-
 .../paimon/flink/action/CompactActionITCase.java   | 210 ++++++++++-----------
 .../flink/action/CompactActionITCaseBase.java      |   6 +-
 .../flink/action/CompactDatabaseActionITCase.java  |  47 ++---
 .../paimon/flink/action/DeleteActionITCase.java    |   4 +-
 .../flink/action/DropPartitionActionITCase.java    |   5 +-
 .../flink/action/RollbackToActionITCase.java       |   2 -
 ...ionITCase.java => SortCompactActionITCase.java} |  68 +++----
 paimon-flink/pom.xml                               |   1 +
 .../java/org/apache/paimon/hive/HiveCatalog.java   |   2 +-
 35 files changed, 542 insertions(+), 282 deletions(-)

diff --git a/.github/workflows/e2e-tests-1.16-jdk11.yml 
b/.github/workflows/e2e-tests-1.16-jdk11.yml
index 9d83302bd..1ca1efdab 100644
--- a/.github/workflows/e2e-tests-1.16-jdk11.yml
+++ b/.github/workflows/e2e-tests-1.16-jdk11.yml
@@ -51,7 +51,7 @@ jobs:
           . .github/workflows/utils.sh
           jvm_timezone=$(random_timezone)
           echo "JVM timezone is set to $jvm_timezone"
-          mvn -T 1C -B clean install -DskipTests
-          mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone
+          mvn -T 1C -B clean install -DskipTests -Pflink-1.16
+          mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone 
-Pflink-1.16
         env:
           MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 6d212e908..29cd15747 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -157,7 +157,11 @@ public abstract class AbstractCatalog implements Catalog {
         }
     }
 
-    protected abstract String warehouse();
+    public abstract String warehouse();
+
+    public Map<String, String> options() {
+        return catalogOptions;
+    }
 
     protected abstract TableSchema getDataTableSchema(Identifier identifier)
             throws TableNotExistException;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index e8468cc1b..a9dfb6e29 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -231,7 +231,7 @@ public class FileSystemCatalog extends AbstractCatalog {
     public void close() throws Exception {}
 
     @Override
-    protected String warehouse() {
+    public String warehouse() {
         return warehouse.toString();
     }
 }
diff --git a/paimon-flink/paimon-flink-1.18/pom.xml 
b/paimon-flink/paimon-flink-1.18/pom.xml
new file mode 100644
index 000000000..2df59e0ea
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.18/pom.xml
@@ -0,0 +1,85 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.paimon</groupId>
+        <artifactId>paimon-flink</artifactId>
+        <version>0.6-SNAPSHOT</version>
+    </parent>
+
+    <packaging>jar</packaging>
+
+    <artifactId>paimon-flink-1.18</artifactId>
+    <name>Paimon : Flink : 1.18</name>
+
+    <properties>
+        <flink.version>1.18-SNAPSHOT</flink.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-flink-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-flink-cdc</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>shade-paimon</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <artifactSet>
+                                <includes combine.children="append">
+                                    
<include>org.apache.paimon:paimon-flink-common</include>
+                                    
<include>org.apache.paimon:paimon-flink-cdc</include>
+                                </includes>
+                            </artifactSet>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
index 155b0b5ea..3ad479a68 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
@@ -202,11 +202,6 @@ public class KafkaSyncDatabaseAction extends ActionBase {
         return tableConfig;
     }
 
-    @VisibleForTesting
-    public Map<String, String> catalogConfig() {
-        return catalogConfig;
-    }
-
     // ------------------------------------------------------------------------
     //  Flink run methods
     // ------------------------------------------------------------------------
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
index d8669eba3..0b362d4fb 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
@@ -203,11 +203,6 @@ public class KafkaSyncTableAction extends ActionBase {
         return tableConfig;
     }
 
-    @VisibleForTesting
-    public Map<String, String> catalogConfig() {
-        return catalogConfig;
-    }
-
     // ------------------------------------------------------------------------
     //  Flink run methods
     // ------------------------------------------------------------------------
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
index f6043824c..ea4d7a850 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
@@ -184,11 +184,6 @@ public class MongoDBSyncDatabaseAction extends ActionBase {
         return tableConfig;
     }
 
-    @VisibleForTesting
-    public Map<String, String> catalogConfig() {
-        return catalogConfig;
-    }
-
     // ------------------------------------------------------------------------
     //  Flink run methods
     // ------------------------------------------------------------------------
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
index ed868a0ec..1c6b78ae3 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
@@ -201,11 +201,6 @@ public class MongoDBSyncTableAction extends ActionBase {
         return tableConfig;
     }
 
-    @VisibleForTesting
-    public Map<String, String> catalogConfig() {
-        return catalogConfig;
-    }
-
     // ------------------------------------------------------------------------
     //  Flink run methods
     // ------------------------------------------------------------------------
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index 8fc1ad16e..29c8261f8 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -372,11 +372,6 @@ public class MySqlSyncDatabaseAction extends ActionBase {
         return excludedTables;
     }
 
-    @VisibleForTesting
-    public Map<String, String> catalogConfig() {
-        return catalogConfig;
-    }
-
     @VisibleForTesting
     public Map<String, String> tableConfig() {
         return tableConfig;
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index 3fca52255..acbec8eba 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -280,11 +280,6 @@ public class MySqlSyncTableAction extends ActionBase {
         return tableConfig;
     }
 
-    @VisibleForTesting
-    public Map<String, String> catalogConfig() {
-        return catalogConfig;
-    }
-
     // ------------------------------------------------------------------------
     //  Flink run methods
     // ------------------------------------------------------------------------
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
index b4f6dcf48..f4e1a4577 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.flink.action.cdc;
 
-import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.ActionBase;
 import org.apache.paimon.flink.action.ActionITCaseBase;
 import org.apache.paimon.table.FileStoreTable;
@@ -28,7 +27,11 @@ import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
 
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,9 +53,19 @@ public class CdcActionITCaseBase extends ActionITCaseBase {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(CdcActionITCaseBase.class);
 
-    protected FileStoreTable getFileStoreTable(String tableName) throws 
Exception {
-        Identifier identifier = Identifier.create(database, tableName);
-        return (FileStoreTable) catalog.getTable(identifier);
+    protected StreamExecutionEnvironment env;
+
+    @BeforeEach
+    public void setEnv() {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(2);
+        env.enableCheckpointing(1000);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+    }
+
+    @AfterEach
+    public void closeEnv() throws Exception {
+        env.close();
     }
 
     protected void waitingTables(String... tables) throws Exception {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
index 30a50afe3..79c01d474 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
@@ -566,8 +566,7 @@ public class KafkaCanalSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
                         .withTableConfig(Collections.singletonMap("table-key", 
"table-value"))
                         .build();
 
-        assertThat(action.catalogConfig())
-                
.containsExactlyEntriesOf(Collections.singletonMap("catalog-key", 
"catalog-value"));
+        assertThat(action.catalogConfig()).containsEntry("catalog-key", 
"catalog-value");
         assertThat(action.tableConfig())
                 
.containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index 6b50274e9..d21dad37e 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -953,8 +953,7 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaActionITCaseBase {
                         .withTableConfig(Collections.singletonMap("table-key", 
"table-value"))
                         .build();
 
-        assertThat(action.catalogConfig())
-                
.containsExactlyEntriesOf(Collections.singletonMap("catalog-key", 
"catalog-value"));
+        assertThat(action.catalogConfig()).containsEntry("catalog-key", 
"catalog-value");
         assertThat(action.tableConfig())
                 
.containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
index 0d93200bd..2a06b4fc4 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
@@ -126,8 +126,7 @@ public class MongoDBSyncDatabaseActionITCase extends 
MongoDBActionITCaseBase {
                         .withTableConfig(Collections.singletonMap("table-key", 
"table-value"))
                         .build();
 
-        assertThat(action.catalogConfig())
-                
.containsExactlyEntriesOf(Collections.singletonMap("catalog-key", 
"catalog-value"));
+        assertThat(action.catalogConfig()).containsEntry("catalog-key", 
"catalog-value");
         assertThat(action.tableConfig())
                 
.containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
index 75642b89b..bd4d78f04 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
@@ -158,8 +158,7 @@ public class MongoDBSyncTableActionITCase extends 
MongoDBActionITCaseBase {
                         .withTableConfig(Collections.singletonMap("table-key", 
"table-value"))
                         .build();
 
-        assertThat(action.catalogConfig())
-                
.containsExactlyEntriesOf(Collections.singletonMap("catalog-key", 
"catalog-value"));
+        assertThat(action.catalogConfig()).containsEntry("catalog-key", 
"catalog-value");
         assertThat(action.tableConfig())
                 
.containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index d41bfe6e7..8990a73b7 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -1190,8 +1190,7 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
                         .withTableConfig(Collections.singletonMap("table-key", 
"table-value"))
                         .build();
 
-        assertThat(action.catalogConfig())
-                
.containsExactlyEntriesOf(Collections.singletonMap("catalog-key", 
"catalog-value"));
+        assertThat(action.catalogConfig()).containsEntry("catalog-key", 
"catalog-value");
         assertThat(action.tableConfig())
                 
.containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 4b3c6ef13..d0e725c1e 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -851,8 +851,7 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                         .withTableConfig(Collections.singletonMap("table-key", 
"table-value"))
                         .build();
 
-        assertThat(action.catalogConfig())
-                
.containsExactlyEntriesOf(Collections.singletonMap("catalog-key", 
"catalog-value"));
+        assertThat(action.catalogConfig()).containsEntry("catalog-key", 
"catalog-value");
         assertThat(action.tableConfig())
                 
.containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 406969ea5..d15068935 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -891,7 +891,7 @@ public class FlinkCatalog extends AbstractCatalog {
      */
     public Procedure getProcedure(ObjectPath procedurePath)
             throws ProcedureNotExistException, CatalogException {
-        return ProcedureUtil.getProcedure(procedurePath.getObjectName())
+        return ProcedureUtil.getProcedure(catalog, 
procedurePath.getObjectName())
                 .orElseThrow(() -> new ProcedureNotExistException(name, 
procedurePath));
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
index 6a4477f47..9c2687bd5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.action;
 
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.flink.FlinkCatalog;
 import org.apache.paimon.flink.FlinkCatalogFactory;
@@ -43,9 +44,7 @@ import java.util.stream.Collectors;
 /** Abstract base of {@link Action} for table. */
 public abstract class ActionBase implements Action {
 
-    private final Options catalogOptions;
-
-    protected final Map<String, String> catalogConfig;
+    protected final Options catalogOptions;
     protected final Catalog catalog;
     protected final FlinkCatalog flinkCatalog;
     protected final String catalogName = "paimon-" + UUID.randomUUID();
@@ -53,7 +52,6 @@ public abstract class ActionBase implements Action {
     protected final StreamTableEnvironment batchTEnv;
 
     public ActionBase(String warehouse, Map<String, String> catalogConfig) {
-        this.catalogConfig = catalogConfig;
         catalogOptions = Options.fromMap(catalogConfig);
         catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse);
 
@@ -110,4 +108,9 @@ public abstract class ActionBase implements Action {
 
         return true;
     }
+
+    @VisibleForTesting
+    public Map<String, String> catalogConfig() {
+        return catalogOptions.toMap();
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
index b74bd8b2f..cb53c5db7 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
@@ -21,7 +21,6 @@ package org.apache.paimon.flink.action;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.utils.MultipleParameterTool;
 
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -44,24 +43,10 @@ public class CompactActionFactory implements ActionFactory {
 
         CompactAction action;
         if (params.has("order-strategy")) {
-            SortCompactAction sortCompactAction =
-                    new SortCompactAction(tablePath.f0, tablePath.f1, 
tablePath.f2, catalogConfig);
-
-            String strategy = params.get("order-strategy");
-            sortCompactAction.withOrderStrategy(strategy);
-
-            if (params.has("order-by")) {
-                String sqlOrderBy = params.get("order-by");
-                if (sqlOrderBy == null) {
-                    throw new IllegalArgumentException("Please specify 
\"order-by\".");
-                }
-                
sortCompactAction.withOrderColumns(Arrays.asList(sqlOrderBy.split(",")));
-            } else {
-                throw new IllegalArgumentException(
-                        "Please specify order columns in parameter 
--order-by.");
-            }
-
-            action = sortCompactAction;
+            action =
+                    new SortCompactAction(tablePath.f0, tablePath.f1, 
tablePath.f2, catalogConfig)
+                            .withOrderStrategy(params.get("order-strategy"))
+                            .withOrderColumns(getRequiredValue(params, 
"order-by").split(","));
         } else {
             action = new CompactAction(tablePath.f0, tablePath.f1, 
tablePath.f2, catalogConfig);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
index 16557a508..522aace00 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
@@ -38,10 +38,12 @@ import org.apache.flink.table.data.RowData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
@@ -126,11 +128,17 @@ public class SortCompactAction extends CompactAction {
         flinkSinkBuilder.build();
     }
 
-    public void withOrderStrategy(String sortStrategy) {
+    public SortCompactAction withOrderStrategy(String sortStrategy) {
         this.sortStrategy = sortStrategy;
+        return this;
     }
 
-    public void withOrderColumns(List<String> orderColumns) {
-        this.orderColumns = orderColumns;
+    public SortCompactAction withOrderColumns(String... orderColumns) {
+        return withOrderColumns(Arrays.asList(orderColumns));
+    }
+
+    public SortCompactAction withOrderColumns(List<String> orderColumns) {
+        this.orderColumns = 
orderColumns.stream().map(String::trim).collect(Collectors.toList());
+        return this;
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
index 3b8147a3e..02b8e1943 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
@@ -56,8 +56,6 @@ public abstract class TableActionBase extends ActionBase {
         try {
             table = catalog.getTable(identifier);
         } catch (Catalog.TableNotExistException e) {
-            LOG.error("Table doesn't exist in given path.", e);
-            System.err.println("Table doesn't exist in given path.");
             throw new RuntimeException(e);
         }
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
new file mode 100644
index 000000000..cb8aa3162
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.AbstractCatalog;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.CompactAction;
+import org.apache.paimon.flink.action.SortCompactAction;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.paimon.flink.action.ActionFactory.parseCommaSeparatedKeyValues;
+
+/**
+ * Compact procedure. Usage:
+ *
+ * <pre><code>
+ *  -- compact a table (tableId should be 'database_name.table_name')
+ *  CALL compact('tableId')
+ *
+ *  -- compact a table with sorting
+ *  CALL compact('tableId', 'order-strategy', 'order-by-columns')
+ *
+ *  -- compact specific partitions ('pt1=A,pt2=a', 'pt1=B,pt2=b', ...)
+ *  -- NOTE: if you don't need sorting but you want specify partitions, use '' 
as placeholder
+ *  CALL compact('tableId', '', '', partition1, partition2, ...)
+ * </code></pre>
+ */
+public class CompactProcedure extends ProcedureBase {
+
+    public CompactProcedure(Catalog catalog) {
+        super(catalog);
+    }
+
+    public String[] call(ProcedureContext procedureContext, String tableId) 
throws Exception {
+        return call(procedureContext, tableId, "", "");
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext,
+            String tableId,
+            String orderStrategy,
+            String orderByColumns)
+            throws Exception {
+        return call(procedureContext, tableId, orderStrategy, orderByColumns, 
new String[0]);
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext,
+            String tableId,
+            String orderStrategy,
+            String orderByColumns,
+            String... partitionStrings)
+            throws Exception {
+        String warehouse = ((AbstractCatalog) catalog).warehouse();
+        Map<String, String> catalogOptions = ((AbstractCatalog) 
catalog).options();
+        Identifier identifier = Identifier.fromString(tableId);
+        CompactAction action;
+        String jobName;
+        if (orderStrategy.isEmpty() && orderByColumns.isEmpty()) {
+            action =
+                    new CompactAction(
+                            warehouse,
+                            identifier.getDatabaseName(),
+                            identifier.getObjectName(),
+                            catalogOptions);
+            jobName = "Compact Job";
+        } else if (!orderStrategy.isEmpty() && !orderByColumns.isEmpty()) {
+            action =
+                    new SortCompactAction(
+                                    warehouse,
+                                    identifier.getDatabaseName(),
+                                    identifier.getObjectName(),
+                                    catalogOptions)
+                            .withOrderStrategy(orderStrategy)
+                            .withOrderColumns(orderByColumns.split(","));
+            jobName = "Sort Compact Job";
+        } else {
+            throw new IllegalArgumentException(
+                    "You must specify 'order strategy' and 'order by columns' 
both.");
+        }
+
+        if (partitionStrings.length != 0) {
+            List<Map<String, String>> partitions = new ArrayList<>();
+            for (String partition : partitionStrings) {
+                partitions.add(parseCommaSeparatedKeyValues(partition));
+            }
+            action.withPartitions(partitions);
+        }
+
+        StreamExecutionEnvironment env = 
procedureContext.getExecutionEnvironment();
+        action.build(env);
+
+        return execute(env, jobName);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
new file mode 100644
index 000000000..c44fe2701
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
+
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.procedures.Procedure;
+
+import static 
org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
+
+/** Base implementation for flink {@link Procedure}. */
+public class ProcedureBase implements Procedure {
+
+    protected final Catalog catalog;
+
+    ProcedureBase(Catalog catalog) {
+        this.catalog = catalog;
+    }
+
+    protected String[] execute(StreamExecutionEnvironment env, String 
defaultJobName)
+            throws Exception {
+        ReadableConfig conf = 
StreamExecutionEnvironmentUtils.getConfiguration(env);
+        String name = 
conf.getOptional(PipelineOptions.NAME).orElse(defaultJobName);
+        JobClient jobClient = env.executeAsync(name);
+        String jobId = jobClient.getJobID().toString();
+        if (conf.get(TABLE_DML_SYNC)) {
+            try {
+                jobClient.getJobExecutionResult().get();
+            } catch (Exception e) {
+                throw new TableException(String.format("Failed to wait job 
'%s' finish", jobId), e);
+            }
+            return new String[] {"Success"};
+        } else {
+            return new String[] {"JobID=" + jobId};
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
index 8e9a02435..e90450afc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
@@ -18,13 +18,14 @@
 
 package org.apache.paimon.flink.procedure;
 
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.flink.action.CompactActionFactory;
+
 import org.apache.flink.table.procedures.Procedure;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 
 /** Utility methods for {@link Procedure}. */
@@ -33,13 +34,21 @@ public class ProcedureUtil {
     private ProcedureUtil() {}
 
     private static final List<String> SYSTEM_PROCEDURES = new ArrayList<>();
-    private static final Map<String, Procedure> SYSTEM_PROCEDURES_MAP = new 
HashMap<>();
+
+    static {
+        SYSTEM_PROCEDURES.add(CompactActionFactory.IDENTIFIER);
+    }
 
     public static List<String> listProcedures() {
         return Collections.unmodifiableList(SYSTEM_PROCEDURES);
     }
 
-    public static Optional<Procedure> getProcedure(String procedureName) {
-        return Optional.ofNullable(SYSTEM_PROCEDURES_MAP.get(procedureName));
+    public static Optional<Procedure> getProcedure(Catalog catalog, String 
procedureName) {
+        switch (procedureName) {
+            case CompactActionFactory.IDENTIFIER:
+                return Optional.of(new CompactProcedure(catalog));
+            default:
+                return Optional.empty();
+        }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
index f3a1c5881..0e71d076f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
@@ -35,14 +35,21 @@ import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.SnapshotManager;
 
+import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.config.TableConfigOptions;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -55,10 +62,8 @@ public abstract class ActionITCaseBase extends 
AbstractTestBase {
     protected String database;
     protected String tableName;
     protected String commitUser;
-    protected SnapshotManager snapshotManager;
     protected StreamTableWrite write;
     protected StreamTableCommit commit;
-    protected StreamExecutionEnvironment env;
     protected Catalog catalog;
     private long incrementalIdentifier;
 
@@ -69,11 +74,6 @@ public abstract class ActionITCaseBase extends 
AbstractTestBase {
         tableName = "test_table_" + UUID.randomUUID();
         commitUser = UUID.randomUUID().toString();
         incrementalIdentifier = 0;
-        env = StreamExecutionEnvironment.getExecutionEnvironment();
-        env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-        env.setParallelism(2);
-        env.enableCheckpointing(1000);
-        env.setRestartStrategy(RestartStrategies.noRestart());
         catalog = CatalogFactory.createCatalog(CatalogContext.create(new 
Path(warehouse)));
     }
 
@@ -81,11 +81,12 @@ public abstract class ActionITCaseBase extends 
AbstractTestBase {
     public void after() throws Exception {
         if (write != null) {
             write.close();
+            write = null;
         }
         if (commit != null) {
             commit.close();
+            commit = null;
         }
-        env.close();
         catalog.close();
     }
 
@@ -114,6 +115,11 @@ public abstract class ActionITCaseBase extends 
AbstractTestBase {
         return (FileStoreTable) catalog.getTable(identifier);
     }
 
+    protected FileStoreTable getFileStoreTable(String tableName) throws 
Exception {
+        Identifier identifier = Identifier.create(database, tableName);
+        return (FileStoreTable) catalog.getTable(identifier);
+    }
+
     protected GenericRow rowData(Object... values) {
         return GenericRow.of(values);
     }
@@ -135,4 +141,45 @@ public abstract class ActionITCaseBase extends 
AbstractTestBase {
             return result;
         }
     }
+
+    protected StreamExecutionEnvironment buildDefaultEnv(boolean isStreaming) {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+        env.setParallelism(2);
+
+        if (isStreaming) {
+            env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+            
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+            env.getCheckpointConfig().setCheckpointInterval(500);
+        } else {
+            env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        }
+
+        return env;
+    }
+
+    protected void callProcedure(String procedureStatement, boolean 
isStreaming, boolean dmlSync) {
+        StreamExecutionEnvironment env = buildDefaultEnv(isStreaming);
+
+        TableEnvironment tEnv;
+        if (isStreaming) {
+            tEnv = StreamTableEnvironment.create(env, 
EnvironmentSettings.inStreamingMode());
+            tEnv.getConfig()
+                    .set(
+                            
ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
+                            Duration.ofMillis(500));
+        } else {
+            tEnv = StreamTableEnvironment.create(env, 
EnvironmentSettings.inBatchMode());
+        }
+
+        tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, dmlSync);
+
+        tEnv.executeSql(
+                String.format(
+                        "CREATE CATALOG PAIMON WITH ('type'='paimon', 
'warehouse'='%s');",
+                        warehouse));
+        tEnv.useCatalog("PAIMON");
+
+        tEnv.executeSql(procedureStatement);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index d2e7e5c17..a05b7c2f6 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -21,8 +21,6 @@ package org.apache.paimon.flink.action;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.table.source.DataSplit;
@@ -32,10 +30,8 @@ import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.CommonTestUtils;
+import org.apache.paimon.utils.SnapshotManager;
 
-import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -62,20 +58,11 @@ public class CompactActionITCase extends 
CompactActionITCaseBase {
     @Test
     @Timeout(60)
     public void testBatchCompact() throws Exception {
-        Map<String, String> options = new HashMap<>();
-        options.put(CoreOptions.WRITE_ONLY.key(), "true");
-
         FileStoreTable table =
-                createFileStoreTable(
-                        ROW_TYPE,
+                prepareTable(
                         Arrays.asList("dt", "hh"),
                         Arrays.asList("dt", "hh", "k"),
-                        options);
-        snapshotManager = table.snapshotManager();
-        StreamWriteBuilder streamWriteBuilder =
-                table.newStreamWriteBuilder().withCommitUser(commitUser);
-        write = streamWriteBuilder.newWrite();
-        commit = streamWriteBuilder.newCommit();
+                        Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), 
"true"));
 
         writeData(
                 rowData(1, 100, 15, BinaryString.fromString("20221208")),
@@ -87,21 +74,15 @@ public class CompactActionITCase extends 
CompactActionITCaseBase {
                 rowData(2, 100, 16, BinaryString.fromString("20221208")),
                 rowData(2, 100, 15, BinaryString.fromString("20221209")));
 
-        Snapshot snapshot = 
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
-        assertThat(snapshot.id()).isEqualTo(2);
-        
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+        checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
-        env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
-        new CompactAction(warehouse, database, tableName)
-                .withPartitions(getSpecifiedPartitions())
-                .build(env);
-        env.execute();
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            runAction(false);
+        } else {
+            callProcedure(false);
+        }
 
-        snapshot = 
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
-        assertThat(snapshot.id()).isEqualTo(3);
-        
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
+        checkLatestSnapshot(table, 3, Snapshot.CommitKind.COMPACT);
 
         List<DataSplit> splits = table.newSnapshotReader().read().dataSplits();
         assertThat(splits.size()).isEqualTo(3);
@@ -118,28 +99,18 @@ public class CompactActionITCase extends 
CompactActionITCaseBase {
 
     @Test
     public void testStreamingCompact() throws Exception {
-        Map<String, String> options = new HashMap<>();
-        options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "full-compaction");
-        options.put(
-                
FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL.key(),
-                "1s");
-        options.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s");
-        options.put(CoreOptions.WRITE_ONLY.key(), "true");
+        Map<String, String> tableOptions = new HashMap<>();
+        tableOptions.put(CoreOptions.CHANGELOG_PRODUCER.key(), 
"full-compaction");
+        tableOptions.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1");
+        tableOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), 
"1s");
+        tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
         // test that dedicated compact job will expire snapshots
-        options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "3");
-        options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "3");
+        tableOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "3");
+        tableOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "3");
 
         FileStoreTable table =
-                createFileStoreTable(
-                        ROW_TYPE,
-                        Arrays.asList("dt", "hh"),
-                        Arrays.asList("dt", "hh", "k"),
-                        options);
-        snapshotManager = table.snapshotManager();
-        StreamWriteBuilder streamWriteBuilder =
-                table.newStreamWriteBuilder().withCommitUser(commitUser);
-        write = streamWriteBuilder.newWrite();
-        commit = streamWriteBuilder.newCommit();
+                prepareTable(
+                        Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", 
"k"), tableOptions);
 
         // base records
         writeData(
@@ -147,24 +118,18 @@ public class CompactActionITCase extends 
CompactActionITCaseBase {
                 rowData(1, 100, 16, BinaryString.fromString("20221208")),
                 rowData(1, 100, 15, BinaryString.fromString("20221209")));
 
-        Snapshot snapshot = 
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
-        assertThat(snapshot.id()).isEqualTo(1);
-        
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+        checkLatestSnapshot(table, 1, Snapshot.CommitKind.APPEND);
 
         // no full compaction has happened, so plan should be empty
         StreamTableScan scan = table.newReadBuilder().newStreamScan();
         TableScan.Plan plan = scan.plan();
         assertThat(plan.splits()).isEmpty();
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
-        
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
-        env.getCheckpointConfig().setCheckpointInterval(500);
-        env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
-        new CompactAction(warehouse, database, tableName)
-                .withPartitions(getSpecifiedPartitions())
-                .build(env);
-        JobClient client = env.executeAsync();
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            runAction(true);
+        } else {
+            callProcedure(true);
+        }
 
         // first full compaction
         validateResult(
@@ -193,6 +158,7 @@ public class CompactActionITCase extends 
CompactActionITCaseBase {
                 60_000);
 
         // assert dedicated compact job will expire snapshots
+        SnapshotManager snapshotManager = table.snapshotManager();
         CommonTestUtils.waitUtil(
                 () ->
                         snapshotManager.latestSnapshotId() - 2
@@ -200,27 +166,18 @@ public class CompactActionITCase extends 
CompactActionITCaseBase {
                 Duration.ofSeconds(60_000),
                 Duration.ofSeconds(100),
                 String.format("Cannot validate snapshot expiration in %s 
milliseconds.", 60_000));
-
-        client.cancel();
     }
 
     @Test
     public void testUnawareBucketStreamingCompact() throws Exception {
-        Map<String, String> options = new HashMap<>();
-        options.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s");
-        // test that dedicated compact job will expire snapshots
-        options.put(CoreOptions.BUCKET.key(), "-1");
-        options.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
-        options.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2");
+        Map<String, String> tableOptions = new HashMap<>();
+        tableOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), 
"1s");
+        tableOptions.put(CoreOptions.BUCKET.key(), "-1");
+        tableOptions.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
+        tableOptions.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2");
 
         FileStoreTable table =
-                createFileStoreTable(
-                        ROW_TYPE, Collections.singletonList("k"), 
Collections.emptyList(), options);
-        snapshotManager = table.snapshotManager();
-        StreamWriteBuilder streamWriteBuilder =
-                table.newStreamWriteBuilder().withCommitUser(commitUser);
-        write = streamWriteBuilder.newWrite();
-        commit = streamWriteBuilder.newCommit();
+                prepareTable(Collections.singletonList("k"), 
Collections.emptyList(), tableOptions);
 
         // base records
         writeData(
@@ -233,21 +190,16 @@ public class CompactActionITCase extends 
CompactActionITCaseBase {
                 rowData(1, 100, 16, BinaryString.fromString("20221208")),
                 rowData(1, 100, 15, BinaryString.fromString("20221209")));
 
-        Snapshot snapshot = 
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
-        assertThat(snapshot.id()).isEqualTo(2);
-        
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
-
-        FileStoreScan storeScan = table.store().newScan();
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
-        
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
-        env.getCheckpointConfig().setCheckpointInterval(500);
-        env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
-        new CompactAction(warehouse, database, tableName).build(env);
-        JobClient client = env.executeAsync();
+        checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);
+
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            runAction(true);
+        } else {
+            callProcedure(true);
+        }
 
         // first compaction, snapshot will be 3
-        checkFileAndRowSize(storeScan, 3L, 30_000L, 1, 6);
+        checkFileAndRowSize(table, 3L, 30_000L, 1, 6);
 
         writeData(
                 rowData(1, 101, 15, BinaryString.fromString("20221208")),
@@ -255,27 +207,18 @@ public class CompactActionITCase extends 
CompactActionITCaseBase {
                 rowData(1, 101, 15, BinaryString.fromString("20221209")));
 
         // second compaction, snapshot will be 5
-        checkFileAndRowSize(storeScan, 5L, 30_000L, 1, 9);
-
-        client.cancel().get();
+        checkFileAndRowSize(table, 5L, 30_000L, 1, 9);
     }
 
     @Test
     public void testUnawareBucketBatchCompact() throws Exception {
-        Map<String, String> options = new HashMap<>();
-        // test that dedicated compact job will expire snapshots
-        options.put(CoreOptions.BUCKET.key(), "-1");
-        options.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
-        options.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2");
+        Map<String, String> tableOptions = new HashMap<>();
+        tableOptions.put(CoreOptions.BUCKET.key(), "-1");
+        tableOptions.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
+        tableOptions.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2");
 
         FileStoreTable table =
-                createFileStoreTable(
-                        ROW_TYPE, Collections.singletonList("k"), 
Collections.emptyList(), options);
-        snapshotManager = table.snapshotManager();
-        StreamWriteBuilder streamWriteBuilder =
-                table.newStreamWriteBuilder().withCommitUser(commitUser);
-        write = streamWriteBuilder.newWrite();
-        commit = streamWriteBuilder.newCommit();
+                prepareTable(Collections.singletonList("k"), 
Collections.emptyList(), tableOptions);
 
         // base records
         writeData(
@@ -288,19 +231,60 @@ public class CompactActionITCase extends 
CompactActionITCaseBase {
                 rowData(1, 100, 16, BinaryString.fromString("20221208")),
                 rowData(1, 100, 15, BinaryString.fromString("20221209")));
 
-        Snapshot snapshot = 
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
-        assertThat(snapshot.id()).isEqualTo(2);
-        
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+        checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);
 
-        FileStoreScan storeScan = table.store().newScan();
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
-        env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
-        new CompactAction(warehouse, database, tableName).build(env);
-        env.execute();
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            runAction(false);
+        } else {
+            callProcedure(false);
+        }
 
         // first compaction, snapshot will be 3.
-        checkFileAndRowSize(storeScan, 3L, 0L, 1, 6);
+        checkFileAndRowSize(table, 3L, 0L, 1, 6);
+    }
+
+    private FileStoreTable prepareTable(
+            List<String> partitionKeys, List<String> primaryKeys, Map<String, 
String> tableOptions)
+            throws Exception {
+        FileStoreTable table =
+                createFileStoreTable(ROW_TYPE, partitionKeys, primaryKeys, 
tableOptions);
+
+        StreamWriteBuilder streamWriteBuilder =
+                table.newStreamWriteBuilder().withCommitUser(commitUser);
+        write = streamWriteBuilder.newWrite();
+        commit = streamWriteBuilder.newCommit();
+
+        return table;
+    }
+
+    private void checkLatestSnapshot(
+            FileStoreTable table, long snapshotId, Snapshot.CommitKind 
commitKind) {
+        SnapshotManager snapshotManager = table.snapshotManager();
+        Snapshot snapshot = 
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+        assertThat(snapshot.id()).isEqualTo(snapshotId);
+        assertThat(snapshot.commitKind()).isEqualTo(commitKind);
+    }
+
+    private void runAction(boolean isStreaming) throws Exception {
+        StreamExecutionEnvironment env = buildDefaultEnv(isStreaming);
+
+        new CompactAction(warehouse, database, tableName)
+                .withPartitions(getSpecifiedPartitions())
+                .build(env);
+        if (isStreaming) {
+            env.executeAsync();
+        } else {
+            env.execute();
+        }
+    }
+
+    private void callProcedure(boolean isStreaming) {
+        callProcedure(
+                String.format(
+                        "CALL compact('%s.%s', '', '', '%s', '%s')",
+                        database, tableName, "dt=20221208,hh=15", 
"dt=20221209,hh=15"),
+                isStreaming,
+                !isStreaming);
     }
 
     private List<Map<String, String>> getSpecifiedPartitions() {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java
index 0f2ada0a9..4c646444c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java
@@ -25,6 +25,7 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.SnapshotManager;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -35,6 +36,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 /** Base IT cases for {@link CompactAction} and {@link CompactDatabaseAction} 
. */
 public class CompactActionITCaseBase extends ActionITCaseBase {
+
     protected void validateResult(
             FileStoreTable table,
             RowType rowType,
@@ -63,8 +65,10 @@ public class CompactActionITCaseBase extends 
ActionITCaseBase {
     }
 
     protected void checkFileAndRowSize(
-            FileStoreScan scan, Long expectedSnapshotId, Long timeout, int 
fileNum, long rowCount)
+            FileStoreTable table, Long expectedSnapshotId, Long timeout, int 
fileNum, long rowCount)
             throws Exception {
+        SnapshotManager snapshotManager = table.snapshotManager();
+        FileStoreScan scan = table.store().newScan();
 
         long start = System.currentTimeMillis();
         while (!Objects.equals(snapshotManager.latestSnapshotId(), 
expectedSnapshotId)) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
index 2c112f290..ce2e72641 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
@@ -24,7 +24,6 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.StreamTableCommit;
@@ -37,6 +36,7 @@ import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.CommonTestUtils;
+import org.apache.paimon.utils.SnapshotManager;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.core.execution.JobClient;
@@ -75,17 +75,15 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
     private FileStoreTable createTable(
             String databaseName,
             String tableName,
-            RowType rowType,
             List<String> partitionKeys,
             List<String> primaryKeys,
             Map<String, String> options)
             throws Exception {
-
         Identifier identifier = Identifier.create(databaseName, tableName);
         catalog.createDatabase(databaseName, true);
         catalog.createTable(
                 identifier,
-                new Schema(rowType.getFields(), partitionKeys, primaryKeys, 
options, ""),
+                new Schema(ROW_TYPE.getFields(), partitionKeys, primaryKeys, 
options, ""),
                 false);
         return (FileStoreTable) catalog.getTable(identifier);
     }
@@ -107,12 +105,11 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
                         createTable(
                                 dbName,
                                 tableName,
-                                ROW_TYPE,
                                 Arrays.asList("dt", "hh"),
                                 Arrays.asList("dt", "hh", "k"),
                                 options);
                 tables.add(table);
-                snapshotManager = table.snapshotManager();
+                SnapshotManager snapshotManager = table.snapshotManager();
                 StreamWriteBuilder streamWriteBuilder =
                         
table.newStreamWriteBuilder().withCommitUser(commitUser);
                 write = streamWriteBuilder.newWrite();
@@ -156,7 +153,7 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
         env.execute();
 
         for (FileStoreTable table : tables) {
-            snapshotManager = table.snapshotManager();
+            SnapshotManager snapshotManager = table.snapshotManager();
             Snapshot snapshot =
                     
table.snapshotManager().snapshot(snapshotManager.latestSnapshotId());
             assertThat(snapshot.id()).isEqualTo(3);
@@ -196,12 +193,11 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
                         createTable(
                                 dbName,
                                 tableName,
-                                ROW_TYPE,
                                 Arrays.asList("dt", "hh"),
                                 Arrays.asList("dt", "hh", "k"),
                                 options);
                 tables.add(table);
-                snapshotManager = table.snapshotManager();
+                SnapshotManager snapshotManager = table.snapshotManager();
                 StreamWriteBuilder streamWriteBuilder =
                         
table.newStreamWriteBuilder().withCommitUser(commitUser);
                 write = streamWriteBuilder.newWrite();
@@ -259,7 +255,7 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
                             "+I[1, 100, 16, 20221208]"),
                     60_000);
 
-            snapshotManager = table.snapshotManager();
+            SnapshotManager snapshotManager = table.snapshotManager();
             StreamWriteBuilder streamWriteBuilder =
                     table.newStreamWriteBuilder().withCommitUser(commitUser);
             write = streamWriteBuilder.newWrite();
@@ -306,12 +302,11 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
                             createTable(
                                     dbName,
                                     tableName,
-                                    ROW_TYPE,
                                     Arrays.asList("dt", "hh"),
                                     Arrays.asList("dt", "hh", "k"),
                                     options);
                     newtables.add(table);
-                    snapshotManager = table.snapshotManager();
+                    SnapshotManager snapshotManager = table.snapshotManager();
                     StreamWriteBuilder streamWriteBuilder =
                             
table.newStreamWriteBuilder().withCommitUser(commitUser);
                     write = streamWriteBuilder.newWrite();
@@ -346,7 +341,7 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
                                 "+I[1, 100, 16, 20221208]"),
                         60_000);
 
-                snapshotManager = table.snapshotManager();
+                SnapshotManager snapshotManager = table.snapshotManager();
                 StreamWriteBuilder streamWriteBuilder =
                         
table.newStreamWriteBuilder().withCommitUser(commitUser);
                 write = streamWriteBuilder.newWrite();
@@ -454,7 +449,6 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
                         createTable(
                                 dbName,
                                 tableName,
-                                ROW_TYPE,
                                 Arrays.asList("dt", "hh"),
                                 Arrays.asList("dt", "hh", "k"),
                                 options);
@@ -464,7 +458,7 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
                     noCompactionTables.add(table);
                 }
 
-                snapshotManager = table.snapshotManager();
+                SnapshotManager snapshotManager = table.snapshotManager();
                 StreamWriteBuilder streamWriteBuilder =
                         
table.newStreamWriteBuilder().withCommitUser(commitUser);
                 write = streamWriteBuilder.newWrite();
@@ -508,7 +502,7 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
         env.execute();
 
         for (FileStoreTable table : compactionTables) {
-            snapshotManager = table.snapshotManager();
+            SnapshotManager snapshotManager = table.snapshotManager();
             Snapshot snapshot =
                     
table.snapshotManager().snapshot(snapshotManager.latestSnapshotId());
 
@@ -523,7 +517,7 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
         }
 
         for (FileStoreTable table : noCompactionTables) {
-            snapshotManager = table.snapshotManager();
+            SnapshotManager snapshotManager = table.snapshotManager();
             Snapshot snapshot =
                     
table.snapshotManager().snapshot(snapshotManager.latestSnapshotId());
 
@@ -553,12 +547,11 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
                     createTable(
                             database,
                             tableName,
-                            ROW_TYPE,
-                            Arrays.asList("k"),
+                            Collections.singletonList("k"),
                             Collections.emptyList(),
                             options);
             tables.add(table);
-            snapshotManager = table.snapshotManager();
+            SnapshotManager snapshotManager = table.snapshotManager();
             StreamWriteBuilder streamWriteBuilder =
                     table.newStreamWriteBuilder().withCommitUser(commitUser);
             write = streamWriteBuilder.newWrite();
@@ -593,16 +586,13 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
         JobClient client = env.executeAsync();
 
         for (FileStoreTable table : tables) {
-            FileStoreScan storeScan = table.store().newScan();
-
-            snapshotManager = table.snapshotManager();
             StreamWriteBuilder streamWriteBuilder =
                     table.newStreamWriteBuilder().withCommitUser(commitUser);
             write = streamWriteBuilder.newWrite();
             commit = streamWriteBuilder.newCommit();
 
             // first compaction, snapshot will be 3
-            checkFileAndRowSize(storeScan, 3L, 30_000L, 1, 6);
+            checkFileAndRowSize(table, 3L, 30_000L, 1, 6);
 
             writeData(
                     rowData(1, 101, 15, BinaryString.fromString("20221208")),
@@ -610,7 +600,7 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
                     rowData(1, 101, 15, BinaryString.fromString("20221209")));
 
             // second compaction, snapshot will be 5
-            checkFileAndRowSize(storeScan, 5L, 30_000L, 1, 9);
+            checkFileAndRowSize(table, 5L, 30_000L, 1, 9);
         }
 
         client.cancel().get();
@@ -630,12 +620,11 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
                     createTable(
                             database,
                             tableName,
-                            ROW_TYPE,
                             Collections.singletonList("k"),
                             Collections.emptyList(),
                             options);
             tables.add(table);
-            snapshotManager = table.snapshotManager();
+            SnapshotManager snapshotManager = table.snapshotManager();
             StreamWriteBuilder streamWriteBuilder =
                     table.newStreamWriteBuilder().withCommitUser(commitUser);
             write = streamWriteBuilder.newWrite();
@@ -668,10 +657,8 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
         env.execute();
 
         for (FileStoreTable table : tables) {
-            FileStoreScan storeScan = table.store().newScan();
-            snapshotManager = table.snapshotManager();
             // first compaction, snapshot will be 3.
-            checkFileAndRowSize(storeScan, 3L, 0L, 1, 6);
+            checkFileAndRowSize(table, 3L, 0L, 1, 6);
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
index ab20863e8..b250c80e2 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
@@ -28,6 +28,7 @@ import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.BlockingIterator;
+import org.apache.paimon.utils.SnapshotManager;
 
 import org.apache.flink.types.Row;
 import org.junit.jupiter.api.BeforeEach;
@@ -79,6 +80,7 @@ public class DeleteActionITCase extends ActionITCaseBase {
 
         action.run();
 
+        SnapshotManager snapshotManager = 
getFileStoreTable(tableName).snapshotManager();
         Snapshot snapshot = 
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
         assertThat(snapshot.id()).isEqualTo(2);
         
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
@@ -156,7 +158,7 @@ public class DeleteActionITCase extends ActionITCaseBase {
                         Collections.emptyList(),
                         hasPk ? Collections.singletonList("k") : 
Collections.emptyList(),
                         options);
-        snapshotManager = table.snapshotManager();
+        SnapshotManager snapshotManager = table.snapshotManager();
         StreamWriteBuilder streamWriteBuilder =
                 table.newStreamWriteBuilder().withCommitUser(commitUser);
         write = streamWriteBuilder.newWrite();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
index c5c4c941b..e477adfed 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
@@ -27,6 +27,7 @@ import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.SnapshotManager;
 
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -61,6 +62,7 @@ public class DropPartitionActionITCase extends 
ActionITCaseBase {
                         Collections.emptyMap())
                 .run();
 
+        SnapshotManager snapshotManager = 
getFileStoreTable(tableName).snapshotManager();
         Snapshot snapshot = 
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
         assertThat(snapshot.id()).isEqualTo(5);
         
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
@@ -112,6 +114,7 @@ public class DropPartitionActionITCase extends 
ActionITCaseBase {
                         Collections.emptyMap())
                 .run();
 
+        SnapshotManager snapshotManager = 
getFileStoreTable(tableName).snapshotManager();
         Snapshot snapshot = 
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
         assertThat(snapshot.id()).isEqualTo(5);
         
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
@@ -153,7 +156,7 @@ public class DropPartitionActionITCase extends 
ActionITCaseBase {
                                 ? Arrays.asList("partKey0", "partKey1", "dt")
                                 : Collections.emptyList(),
                         new HashMap<>());
-        snapshotManager = table.snapshotManager();
+        SnapshotManager snapshotManager = table.snapshotManager();
         StreamWriteBuilder streamWriteBuilder =
                 table.newStreamWriteBuilder().withCommitUser(commitUser);
         write = streamWriteBuilder.newWrite();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
index 38555d47d..97c8637a5 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
@@ -56,7 +56,6 @@ public class RollbackToActionITCase extends ActionITCaseBase {
                         Collections.emptyList(),
                         Collections.singletonList("k"),
                         Collections.emptyMap());
-        snapshotManager = table.snapshotManager();
         StreamWriteBuilder writeBuilder = 
table.newStreamWriteBuilder().withCommitUser(commitUser);
         write = writeBuilder.newWrite();
         commit = writeBuilder.newCommit();
@@ -83,7 +82,6 @@ public class RollbackToActionITCase extends ActionITCaseBase {
                         Collections.emptyList(),
                         Collections.singletonList("k"),
                         Collections.emptyMap());
-        snapshotManager = table.snapshotManager();
         StreamWriteBuilder writeBuilder = 
table.newStreamWriteBuilder().withCommitUser(commitUser);
         write = writeBuilder.newWrite();
         commit = writeBuilder.newCommit();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/OrderRewriteActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionITCase.java
similarity index 86%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/OrderRewriteActionITCase.java
rename to 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionITCase.java
index a4e2424f0..97bfea0a1 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/OrderRewriteActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionITCase.java
@@ -18,19 +18,13 @@
 
 package org.apache.paimon.flink.action;
 
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.Timestamp;
-import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.ManifestEntry;
-import org.apache.paimon.options.CatalogOptions;
-import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.schema.Schema;
@@ -44,7 +38,6 @@ import org.apache.paimon.types.DataTypes;
 
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -54,13 +47,10 @@ import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /** Order Rewrite Action tests for {@link SortCompactAction}. */
-public class OrderRewriteActionITCase extends ActionITCaseBase {
+public class SortCompactActionITCase extends ActionITCaseBase {
 
     private static final Random random = new Random();
 
-    private Catalog catalog;
-    @TempDir private java.nio.file.Path path;
-
     private void prepareData(int size, int loop) throws Exception {
         createTable();
         List<CommitMessage> commitMessages = new ArrayList<>();
@@ -217,45 +207,43 @@ public class OrderRewriteActionITCase extends 
ActionITCaseBase {
     }
 
     private void zorder(List<String> columns) throws Exception {
-        SortCompactAction sortCompactAction =
-                new SortCompactAction(
-                        new Path(path.toUri()).toUri().toString(),
-                        "my_db",
-                        "Orders1",
-                        Collections.emptyMap());
-        sortCompactAction.withOrderStrategy("zorder");
-        sortCompactAction.withOrderColumns(columns);
-        sortCompactAction.run();
+        if (random.nextBoolean()) {
+            new SortCompactAction(warehouse, database, tableName, 
Collections.emptyMap())
+                    .withOrderStrategy("zorder")
+                    .withOrderColumns(columns)
+                    .run();
+        } else {
+            callProcedure("zorder", columns);
+        }
     }
 
     private void order(List<String> columns) throws Exception {
-        SortCompactAction sortCompactAction =
-                new SortCompactAction(
-                        new Path(path.toUri()).toUri().toString(),
-                        "my_db",
-                        "Orders1",
-                        Collections.emptyMap());
-        sortCompactAction.withOrderStrategy("order");
-        sortCompactAction.withOrderColumns(columns);
-        sortCompactAction.run();
+        if (random.nextBoolean()) {
+            new SortCompactAction(warehouse, database, tableName, 
Collections.emptyMap())
+                    .withOrderStrategy("order")
+                    .withOrderColumns(columns)
+                    .run();
+        } else {
+            callProcedure("order", columns);
+        }
     }
 
-    public Catalog getCatalog() {
-        if (catalog == null) {
-            Options options = new Options();
-            options.set(CatalogOptions.WAREHOUSE, new 
Path(path.toUri()).toUri().toString());
-            catalog = 
CatalogFactory.createCatalog(CatalogContext.create(options));
-        }
-        return catalog;
+    private void callProcedure(String orderStrategy, List<String> 
orderByColumns) {
+        callProcedure(
+                String.format(
+                        "CALL compact('%s.%s', '%s', '%s')",
+                        database, tableName, orderStrategy, String.join(",", 
orderByColumns)),
+                false,
+                true);
     }
 
     public void createTable() throws Exception {
-        getCatalog().createDatabase("my_db", true);
-        getCatalog().createTable(identifier(), schema(), true);
+        catalog.createDatabase(database, true);
+        catalog.createTable(identifier(), schema(), true);
     }
 
     public Identifier identifier() {
-        return Identifier.create("my_db", "Orders1");
+        return Identifier.create(database, tableName);
     }
 
     private void commit(List<CommitMessage> messages) throws Exception {
@@ -299,7 +287,7 @@ public class OrderRewriteActionITCase extends 
ActionITCaseBase {
     }
 
     public Table getTable() throws Exception {
-        return getCatalog().getTable(identifier());
+        return catalog.getTable(identifier());
     }
 
     private static List<CommitMessage> writeOnce(Table table, int p, int size) 
throws Exception {
diff --git a/paimon-flink/pom.xml b/paimon-flink/pom.xml
index e92bb526b..8daa819ee 100644
--- a/paimon-flink/pom.xml
+++ b/paimon-flink/pom.xml
@@ -39,6 +39,7 @@ under the License.
         <module>paimon-flink-1.15</module>
         <module>paimon-flink-1.16</module>
         <module>paimon-flink-1.17</module>
+        <module>paimon-flink-1.18</module>
         <module>paimon-flink-action</module>
         <module>paimon-flink-cdc</module>
     </modules>
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index e075d7e21..8893960ea 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -441,7 +441,7 @@ public class HiveCatalog extends AbstractCatalog {
     }
 
     @Override
-    protected String warehouse() {
+    public String warehouse() {
         return warehouse;
     }
 

Reply via email to