This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 399883c99 [flink] Support multiple tables in dedicated compaction job 
(#1852)
399883c99 is described below

commit 399883c99fc40f52aeadd28b617427e0639a8b6b
Author: JunZhang <[email protected]>
AuthorDate: Mon Aug 21 17:53:16 2023 +0800

    [flink] Support multiple tables in dedicated compaction job (#1852)
---
 docs/content/maintenance/multiple-writers.md       |  39 +-
 .../paimon/flink/action/CompactDatabaseAction.java | 176 +++++++
 .../flink/action/CompactDatabaseActionFactory.java |  86 +++
 .../services/org.apache.paimon.factories.Factory   |   1 +
 .../paimon/flink/action/CompactActionITCase.java   |  54 +-
 .../flink/action/CompactActionITCaseBase.java      |  86 +++
 .../flink/action/CompactDatabaseActionITCase.java  | 574 +++++++++++++++++++++
 7 files changed, 964 insertions(+), 52 deletions(-)

diff --git a/docs/content/maintenance/multiple-writers.md 
b/docs/content/maintenance/multiple-writers.md
index 17ede6059..2de3a7f97 100644
--- a/docs/content/maintenance/multiple-writers.md
+++ b/docs/content/maintenance/multiple-writers.md
@@ -95,6 +95,23 @@ Run the following command to submit a compaction job for the 
table.
     [--partition <partition-name>] \
     [--catalog-conf <paimon-catalog-conf> [--catalog-conf 
<paimon-catalog-conf> ...]] \
 ```
+
+Or run the following command to submit a compaction job for multiple database.
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
+    compact-database \
+    --warehouse <warehouse-path> \
+    --database <database-name> \ 
+    [--including-tables <paimon-table-name|name-regular-expr>] \
+    [--excluding-tables <paimon-table-name|name-regular-expr>] \
+    [--catalog-conf <paimon-catalog-conf> [--catalog-conf 
<paimon-catalog-conf> ...]] 
+```
+
+* `--database` is used to specify which database is to be compacted. In 
compact mode, you need to specify a database name, in compact-database mode, 
you could specify multiple database, regular expression is supported.
+* `--including-tables` is used to specify which source tables are to be 
compacted, you must use '|' to separate multiple tables, the format is 
`databaseName.tableName`, regular expression is supported. For example, 
specifying "--including-tables db1.t1|db2.+" means to compact table 'db1.t1' 
and all tables in the db2 database.
+* `--excluding-tables`  is used to specify which source tables are not to be 
compacted. The usage is same as "--including-tables". "--excluding-tables" has 
higher priority than "--including-tables" if you specified both.
 * `--catalog-conf` is the configuration for Paimon catalog. Each configuration 
should be specified in the format `key=value`. See [here]({{< ref 
"maintenance/configurations" >}}) for a complete list of catalog configurations.
 
 If you submit a batch job (set `execution.runtime-mode: batch` in Flink's 
configuration), all current table files will be compacted. If you submit a 
streaming job (set `execution.runtime-mode: streaming` in Flink's 
configuration), the job will continuously monitor new changes to the table and 
perform compactions as needed.
@@ -105,7 +122,7 @@ If you only want to submit the compaction job and don't 
want to wait until the j
 
 {{< /hint >}}
 
-Example
+Example1: compact table
 
 ```bash
 <FLINK_HOME>/bin/flink run \
@@ -121,6 +138,19 @@ Example
     --catalog-conf s3.secret-key=*****
 ```
 
+Example2: compact database
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
+    compact-database \
+    --warehouse s3:///path/to/warehouse \
+    --database test_db \
+    --catalog-conf s3.endpoint=https://****.com \
+    --catalog-conf s3.access-key=***** \
+    --catalog-conf s3.secret-key=*****
+```
+
 For more usage of the compact action, see
 
 ```bash
@@ -128,6 +158,13 @@ For more usage of the compact action, see
     /path/to/paimon-flink-action-{{< version >}}.jar \
     compact --help
 ```
+or
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
+    compact-database --help
+```
 
 {{< /tab >}}
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
new file mode 100644
index 000000000..1be9e7b15
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.compact.UnawareBucketCompactionTopoBuilder;
+import org.apache.paimon.flink.sink.CompactorSinkBuilder;
+import org.apache.paimon.flink.source.CompactorSourceBuilder;
+import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/** Database compact action for Flink. */
+public class CompactDatabaseAction extends ActionBase {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CompactDatabaseAction.class);
+
+    private final Pattern includingPattern;
+    private final Pattern excludingPattern;
+    private final String database;
+    private final Map<String, Table> tableMap = new HashMap<>();
+
+    public CompactDatabaseAction(
+            String warehouse,
+            String database,
+            @Nullable String includingTables,
+            @Nullable String excludingTables,
+            Map<String, String> catalogConfig) {
+        super(warehouse, catalogConfig);
+        this.database = database;
+        this.includingPattern = Pattern.compile(includingTables == null ? ".*" 
: includingTables);
+        this.excludingPattern = excludingTables == null ? null : 
Pattern.compile(excludingTables);
+    }
+
+    private boolean shouldCompactionTable(String paimonFullTableName) {
+        boolean shouldCompaction = 
includingPattern.matcher(paimonFullTableName).matches();
+        if (excludingPattern != null) {
+            shouldCompaction =
+                    shouldCompaction && 
!excludingPattern.matcher(paimonFullTableName).matches();
+        }
+        if (!shouldCompaction) {
+            LOG.debug("Source table '{}' is excluded.", paimonFullTableName);
+        }
+        return shouldCompaction;
+    }
+
+    public void build(StreamExecutionEnvironment env) {
+        try {
+            Pattern databasePattern = Pattern.compile(database);
+            List<String> databases = catalog.listDatabases();
+            for (String databaseName : databases) {
+                Matcher databaseMatcher = 
databasePattern.matcher(databaseName);
+                if (databaseMatcher.matches()) {
+                    List<String> tables = catalog.listTables(databaseName);
+                    for (String tableName : tables) {
+                        String fullTableName = String.format("%s.%s", 
databaseName, tableName);
+                        if (shouldCompactionTable(fullTableName)) {
+                            Table table =
+                                    
catalog.getTable(Identifier.create(databaseName, tableName));
+                            if (!(table instanceof FileStoreTable)) {
+                                LOG.error(
+                                        String.format(
+                                                "Only FileStoreTable supports 
compact action. The table type is '%s'.",
+                                                table.getClass().getName()));
+                                continue;
+                            }
+                            table =
+                                    table.copy(
+                                            Collections.singletonMap(
+                                                    
CoreOptions.WRITE_ONLY.key(), "false"));
+                            tableMap.put(fullTableName, table);
+                        } else {
+                            LOG.debug("The table {} is excluded.", 
fullTableName);
+                        }
+                    }
+                }
+            }
+        } catch (Catalog.DatabaseNotExistException | 
Catalog.TableNotExistException e) {
+            throw new RuntimeException(e);
+        }
+
+        ReadableConfig conf = 
StreamExecutionEnvironmentUtils.getConfiguration(env);
+        boolean isStreaming =
+                conf.get(ExecutionOptions.RUNTIME_MODE) == 
RuntimeExecutionMode.STREAMING;
+        for (Map.Entry<String, Table> entry : tableMap.entrySet()) {
+            FileStoreTable fileStoreTable = (FileStoreTable) entry.getValue();
+            switch (fileStoreTable.bucketMode()) {
+                case UNAWARE:
+                    {
+                        buildForUnawareBucketCompaction(
+                                env,
+                                entry.getKey(),
+                                (AppendOnlyFileStoreTable) entry.getValue(),
+                                isStreaming);
+                        break;
+                    }
+                case FIXED:
+                case DYNAMIC:
+                default:
+                    {
+                        buildForTraditionalCompaction(
+                                env, entry.getKey(), fileStoreTable, 
isStreaming);
+                    }
+            }
+        }
+    }
+
+    private void buildForTraditionalCompaction(
+            StreamExecutionEnvironment env,
+            String fullName,
+            FileStoreTable table,
+            boolean isStreaming) {
+
+        CompactorSourceBuilder sourceBuilder = new 
CompactorSourceBuilder(fullName, table);
+        CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table);
+
+        DataStreamSource<RowData> source =
+                
sourceBuilder.withEnv(env).withContinuousMode(isStreaming).build();
+        sinkBuilder.withInput(source).build();
+    }
+
+    private void buildForUnawareBucketCompaction(
+            StreamExecutionEnvironment env,
+            String fullName,
+            AppendOnlyFileStoreTable table,
+            boolean isStreaming) {
+        UnawareBucketCompactionTopoBuilder unawareBucketCompactionTopoBuilder =
+                new UnawareBucketCompactionTopoBuilder(env, fullName, table);
+
+        unawareBucketCompactionTopoBuilder.withContinuousMode(isStreaming);
+        unawareBucketCompactionTopoBuilder.build();
+    }
+
+    @Override
+    public void run() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        build(env);
+        execute(env, "Compact database job");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java
new file mode 100644
index 000000000..345733616
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Factory to create {@link CompactDatabaseAction}. */
+public class CompactDatabaseActionFactory implements ActionFactory {
+
+    public static final String IDENTIFIER = "compact-database";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Optional<Action> create(MultipleParameterTool params) {
+        String warehouse = params.get("warehouse");
+        String database = params.get("database");
+        String includingTables = params.get("including-tables");
+        String excludingTables = params.get("excluding-tables");
+        Map<String, String> catalogConfig = optionalConfigMap(params, 
"catalog-conf");
+
+        CompactDatabaseAction action =
+                new CompactDatabaseAction(
+                        warehouse, database, includingTables, excludingTables, 
catalogConfig);
+        return Optional.of(action);
+    }
+
+    @Override
+    public void printHelp() {
+        System.out.println(
+                "Action \"compact-database\" runs a dedicated job for 
compacting one or multiple database.");
+        System.out.println();
+
+        System.out.println("Syntax:");
+        System.out.println(
+                "  compact-database --warehouse <warehouse-path> --database 
<database-name> "
+                        + "[--including-tables 
<paimon-table-name|name-regular-expr>] "
+                        + "[--excluding-tables 
<paimon-table-name|name-regular-expr>] ");
+        System.out.println(
+                "  compact-database --warehouse s3://path/to/warehouse 
--database <database-name> "
+                        + "[--catalog-conf <paimon-catalog-conf> 
[--catalog-conf <paimon-catalog-conf> ...]]");
+        System.out.println();
+
+        System.out.println(
+                "--including-tables is used to specify which source tables are 
to be compacted. "
+                        + "You must use '|' to separate multiple tables, the 
format is `databaseName.tableName`, Regular expression is supported.");
+        System.out.println(
+                "--excluding-tables is used to specify which source tables are 
not to be compacted. "
+                        + "The usage is same as --including-tables.");
+        System.out.println(
+                "--excluding-tables has higher priority than 
--including-tables if you specified both.");
+        System.out.println();
+
+        System.out.println("Examples:");
+        System.out.println(
+                "  compact-database --warehouse hdfs:///path/to/warehouse 
--database test_db");
+        System.out.println(
+                "  compact-database --warehouse s3:///path/to/warehouse "
+                        + "--database test_db "
+                        + "--catalog-conf s3.endpoint=https://****.com "
+                        + "--catalog-conf s3.access-key=***** "
+                        + "--catalog-conf s3.secret-key=***** ");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 2e67176dc..c77a3ed38 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -17,6 +17,7 @@ org.apache.paimon.flink.kafka.KafkaLogStoreFactory
 
 ### action factories
 org.apache.paimon.flink.action.CompactActionFactory
+org.apache.paimon.flink.action.CompactDatabaseActionFactory
 org.apache.paimon.flink.action.DropPartitionActionFactory
 org.apache.paimon.flink.action.DeleteActionFactory
 org.apache.paimon.flink.action.MergeIntoActionFactory
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index 4fa96df1d..d2e7e5c17 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -22,8 +22,6 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.manifest.FileKind;
-import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
@@ -43,20 +41,17 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
 import java.time.Duration;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeoutException;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** IT cases for {@link CompactAction}. */
-public class CompactActionITCase extends ActionITCaseBase {
+public class CompactActionITCase extends CompactActionITCaseBase {
 
     private static final DataType[] FIELD_TYPES =
             new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), 
DataTypes.STRING()};
@@ -174,6 +169,7 @@ public class CompactActionITCase extends ActionITCaseBase {
         // first full compaction
         validateResult(
                 table,
+                ROW_TYPE,
                 scan,
                 Arrays.asList("+I[1, 100, 15, 20221208]", "+I[1, 100, 15, 
20221209]"),
                 60_000);
@@ -187,6 +183,7 @@ public class CompactActionITCase extends ActionITCaseBase {
         // second full compaction
         validateResult(
                 table,
+                ROW_TYPE,
                 scan,
                 Arrays.asList(
                         "+U[1, 101, 15, 20221208]",
@@ -317,49 +314,4 @@ public class CompactActionITCase extends ActionITCaseBase {
 
         return Arrays.asList(partition1, partition2);
     }
-
-    private void validateResult(
-            FileStoreTable table, StreamTableScan scan, List<String> expected, 
long timeout)
-            throws Exception {
-        List<String> actual = new ArrayList<>();
-        long start = System.currentTimeMillis();
-        while (actual.size() != expected.size()) {
-            TableScan.Plan plan = scan.plan();
-            actual.addAll(getResult(table.newReadBuilder().newRead(), 
plan.splits(), ROW_TYPE));
-
-            if (System.currentTimeMillis() - start > timeout) {
-                break;
-            }
-        }
-        if (actual.size() != expected.size()) {
-            throw new TimeoutException(
-                    String.format(
-                            "Cannot collect %s records in %s milliseconds.",
-                            expected.size(), timeout));
-        }
-        actual.sort(String::compareTo);
-        assertThat(actual).isEqualTo(expected);
-    }
-
-    private void checkFileAndRowSize(
-            FileStoreScan scan, Long expectedSnapshotId, Long timeout, int 
fileNum, long rowCount)
-            throws Exception {
-
-        long start = System.currentTimeMillis();
-        while (!Objects.equals(snapshotManager.latestSnapshotId(), 
expectedSnapshotId)) {
-            Thread.sleep(500);
-            if (System.currentTimeMillis() - start > timeout) {
-                throw new RuntimeException("can't wait for a compaction.");
-            }
-        }
-
-        List<ManifestEntry> files =
-                
scan.withSnapshot(snapshotManager.latestSnapshotId()).plan().files(FileKind.ADD);
-        long count = 0;
-        for (ManifestEntry file : files) {
-            count += file.file().rowCount();
-        }
-        assertThat(files.size()).isEqualTo(fileNum);
-        assertThat(count).isEqualTo(rowCount);
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java
new file mode 100644
index 000000000..0f2ada0a9
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeoutException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Base IT cases for {@link CompactAction} and {@link CompactDatabaseAction} 
. */
+public class CompactActionITCaseBase extends ActionITCaseBase {
+    protected void validateResult(
+            FileStoreTable table,
+            RowType rowType,
+            StreamTableScan scan,
+            List<String> expected,
+            long timeout)
+            throws Exception {
+        List<String> actual = new ArrayList<>();
+        long start = System.currentTimeMillis();
+        while (actual.size() != expected.size()) {
+            TableScan.Plan plan = scan.plan();
+            actual.addAll(getResult(table.newReadBuilder().newRead(), 
plan.splits(), rowType));
+
+            if (System.currentTimeMillis() - start > timeout) {
+                break;
+            }
+        }
+        if (actual.size() != expected.size()) {
+            throw new TimeoutException(
+                    String.format(
+                            "Cannot collect %s records in %s milliseconds.",
+                            expected.size(), timeout));
+        }
+        actual.sort(String::compareTo);
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    protected void checkFileAndRowSize(
+            FileStoreScan scan, Long expectedSnapshotId, Long timeout, int 
fileNum, long rowCount)
+            throws Exception {
+
+        long start = System.currentTimeMillis();
+        while (!Objects.equals(snapshotManager.latestSnapshotId(), 
expectedSnapshotId)) {
+            Thread.sleep(500);
+            if (System.currentTimeMillis() - start > timeout) {
+                throw new RuntimeException("can't wait for a compaction.");
+            }
+        }
+
+        List<ManifestEntry> files =
+                
scan.withSnapshot(snapshotManager.latestSnapshotId()).plan().files(FileKind.ADD);
+        long count = 0;
+        for (ManifestEntry file : files) {
+            count += file.file().rowCount();
+        }
+        assertThat(files.size()).isEqualTo(fileNum);
+        assertThat(count).isEqualTo(rowCount);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
new file mode 100644
index 000000000..f0b83d989
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
@@ -0,0 +1,574 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CommonTestUtils;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for {@link CompactDatabaseAction}. */
+public class CompactDatabaseActionITCase extends CompactActionITCaseBase {
+
+    private static final String[] DATABASE_NAMES = new String[] {"db1", "db2"};
+    private static final String[] TABLE_NAMES = new String[] {"t1", "t2"};
+    private static final Map<String, RowType> ROW_TYPE_MAP = new 
HashMap<>(TABLE_NAMES.length);
+
+    @BeforeAll
+    public static void beforeAll() {
+        // set different datatype and RowType
+        DataType[] dataTypes1 =
+                new DataType[] {
+                    DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), 
DataTypes.STRING()
+                };
+        DataType[] dataTypes2 =
+                new DataType[] {
+                    DataTypes.INT(), DataTypes.BIGINT(), DataTypes.INT(), 
DataTypes.STRING()
+                };
+
+        ROW_TYPE_MAP.put("t1", RowType.of(dataTypes1, new String[] {"k", "v", 
"hh", "dt"}));
+        ROW_TYPE_MAP.put("t2", RowType.of(dataTypes2, new String[] {"k", "v1", 
"hh", "dt"}));
+    }
+
+    private FileStoreTable createTable(
+            String databaseName,
+            String tableName,
+            RowType rowType,
+            List<String> partitionKeys,
+            List<String> primaryKeys,
+            Map<String, String> options)
+            throws Exception {
+
+        Catalog catalog = catalog();
+        Identifier identifier = Identifier.create(databaseName, tableName);
+        catalog.createDatabase(databaseName, true);
+        catalog.createTable(
+                identifier,
+                new Schema(rowType.getFields(), partitionKeys, primaryKeys, 
options, ""),
+                false);
+        return (FileStoreTable) catalog.getTable(identifier);
+    }
+
+    @Test
+    @Timeout(60)
+    public void testBatchCompact() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.WRITE_ONLY.key(), "true");
+
+        List<FileStoreTable> tables = new ArrayList<>();
+
+        for (String dbName : DATABASE_NAMES) {
+            for (String tableName : TABLE_NAMES) {
+                FileStoreTable table =
+                        createTable(
+                                dbName,
+                                tableName,
+                                ROW_TYPE_MAP.get(tableName),
+                                Arrays.asList("dt", "hh"),
+                                Arrays.asList("dt", "hh", "k"),
+                                options);
+                tables.add(table);
+                snapshotManager = table.snapshotManager();
+                StreamWriteBuilder streamWriteBuilder =
+                        
table.newStreamWriteBuilder().withCommitUser(commitUser);
+                write = streamWriteBuilder.newWrite();
+                commit = streamWriteBuilder.newCommit();
+
+                Object value = null;
+                if (tableName.equals("t1")) {
+                    value = 100;
+                } else if (tableName.equals("t2")) {
+                    value = 100L;
+                }
+
+                writeData(
+                        rowData(1, value, 15, 
BinaryString.fromString("20221208")),
+                        rowData(1, value, 16, 
BinaryString.fromString("20221208")),
+                        rowData(1, value, 15, 
BinaryString.fromString("20221209")));
+
+                writeData(
+                        rowData(2, value, 15, 
BinaryString.fromString("20221208")),
+                        rowData(2, value, 16, 
BinaryString.fromString("20221208")),
+                        rowData(2, value, 15, 
BinaryString.fromString("20221209")));
+
+                Snapshot snapshot = 
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+                assertThat(snapshot.id()).isEqualTo(2);
+                
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+            }
+        }
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
+        new CompactDatabaseAction(warehouse, "db1|db2", null, null, new 
HashMap<>()).build(env);
+        env.execute();
+
+        for (FileStoreTable table : tables) {
+            Snapshot snapshot =
+                    
table.snapshotManager().snapshot(snapshotManager.latestSnapshotId());
+            assertThat(snapshot.id()).isEqualTo(3);
+            
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
+
+            List<DataSplit> splits = 
table.newSnapshotReader().read().dataSplits();
+            assertThat(splits.size()).isEqualTo(3);
+            for (DataSplit split : splits) {
+                assertThat(split.dataFiles().size()).isEqualTo(1);
+            }
+        }
+    }
+
+    @Test
+    @Timeout(60)
+    public void includeTableCompaction() throws Exception {
+        includingAndExcludingTablesImpl(
+                "db1.t1",
+                null,
+                Collections.singletonList(Identifier.fromString("db1.t1")),
+                Arrays.asList(
+                        Identifier.fromString("db1.t2"),
+                        Identifier.fromString("db2.t1"),
+                        Identifier.fromString("db2.t2")));
+    }
+
+    @Test
+    @Timeout(60)
+    public void excludeTableCompaction() throws Exception {
+        includingAndExcludingTablesImpl(
+                null,
+                "db2.t2",
+                Arrays.asList(
+                        Identifier.fromString("db1.t1"),
+                        Identifier.fromString("db1.t2"),
+                        Identifier.fromString("db2.t1")),
+                Collections.singletonList(Identifier.fromString("db2.t2")));
+    }
+
+    @Test
+    @Timeout(60)
+    public void includeAndExcludeTableCompaction() throws Exception {
+        includingAndExcludingTablesImpl(
+                "db1.+|db2.t1",
+                "db1.t2",
+                Arrays.asList(Identifier.fromString("db1.t1"), 
Identifier.fromString("db2.t1")),
+                Arrays.asList(Identifier.fromString("db1.t2"), 
Identifier.fromString("db2.t2")));
+    }
+
+    private void includingAndExcludingTablesImpl(
+            String includingPattern,
+            String excludesPattern,
+            List<Identifier> includeTables,
+            List<Identifier> excludeTables)
+            throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.WRITE_ONLY.key(), "true");
+
+        List<FileStoreTable> compactionTables = new ArrayList<>();
+        List<FileStoreTable> noCompactionTables = new ArrayList<>();
+
+        for (String dbName : DATABASE_NAMES) {
+            for (String tableName : TABLE_NAMES) {
+                FileStoreTable table =
+                        createTable(
+                                dbName,
+                                tableName,
+                                ROW_TYPE_MAP.get(tableName),
+                                Arrays.asList("dt", "hh"),
+                                Arrays.asList("dt", "hh", "k"),
+                                options);
+                if (includeTables.contains(Identifier.create(dbName, 
tableName))) {
+                    compactionTables.add(table);
+                } else if (excludeTables.contains(Identifier.create(dbName, 
tableName))) {
+                    noCompactionTables.add(table);
+                }
+
+                snapshotManager = table.snapshotManager();
+                StreamWriteBuilder streamWriteBuilder =
+                        
table.newStreamWriteBuilder().withCommitUser(commitUser);
+                write = streamWriteBuilder.newWrite();
+                commit = streamWriteBuilder.newCommit();
+
+                Object value = null;
+                if (tableName.equals("t1")) {
+                    value = 100;
+                } else if (tableName.equals("t2")) {
+                    value = 100L;
+                }
+
+                writeData(
+                        rowData(1, value, 15, 
BinaryString.fromString("20221208")),
+                        rowData(1, value, 16, 
BinaryString.fromString("20221208")),
+                        rowData(1, value, 15, 
BinaryString.fromString("20221209")));
+
+                writeData(
+                        rowData(2, value, 15, 
BinaryString.fromString("20221208")),
+                        rowData(2, value, 16, 
BinaryString.fromString("20221208")),
+                        rowData(2, value, 15, 
BinaryString.fromString("20221209")));
+
+                Snapshot snapshot = 
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+                assertThat(snapshot.id()).isEqualTo(2);
+                
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+            }
+        }
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
+        new CompactDatabaseAction(
+                        warehouse, "db1|db2", includingPattern, 
excludesPattern, new HashMap<>())
+                .build(env);
+        env.execute();
+
+        for (FileStoreTable table : compactionTables) {
+            snapshotManager = table.snapshotManager();
+            Snapshot snapshot =
+                    
table.snapshotManager().snapshot(snapshotManager.latestSnapshotId());
+
+            assertThat(snapshot.id()).isEqualTo(3);
+            
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
+
+            List<DataSplit> splits = 
table.newSnapshotReader().read().dataSplits();
+            assertThat(splits.size()).isEqualTo(3);
+            for (DataSplit split : splits) {
+                assertThat(split.dataFiles().size()).isEqualTo(1);
+            }
+        }
+
+        for (FileStoreTable table : noCompactionTables) {
+            snapshotManager = table.snapshotManager();
+            Snapshot snapshot =
+                    
table.snapshotManager().snapshot(snapshotManager.latestSnapshotId());
+
+            assertThat(snapshot.id()).isEqualTo(2);
+            
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+
+            List<DataSplit> splits = 
table.newSnapshotReader().read().dataSplits();
+            assertThat(splits.size()).isEqualTo(3);
+            for (DataSplit split : splits) {
+                assertThat(split.dataFiles().size()).isEqualTo(2);
+            }
+        }
+    }
+
+    @Test
+    public void testStreamingCompact() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "full-compaction");
+        options.put(
+                
FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL.key(),
+                "1s");
+        options.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s");
+        options.put(CoreOptions.WRITE_ONLY.key(), "true");
+        // test that dedicated compact job will expire snapshots
+        options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "3");
+        options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "3");
+
+        List<FileStoreTable> tables = new ArrayList<>();
+        for (String dbName : DATABASE_NAMES) {
+            for (String tableName : TABLE_NAMES) {
+                FileStoreTable table =
+                        createTable(
+                                dbName,
+                                tableName,
+                                ROW_TYPE_MAP.get(tableName),
+                                Arrays.asList("dt", "hh"),
+                                Arrays.asList("dt", "hh", "k"),
+                                options);
+                tables.add(table);
+                snapshotManager = table.snapshotManager();
+                StreamWriteBuilder streamWriteBuilder =
+                        
table.newStreamWriteBuilder().withCommitUser(commitUser);
+                write = streamWriteBuilder.newWrite();
+                commit = streamWriteBuilder.newCommit();
+
+                Object value = null;
+                if (tableName.equals("t1")) {
+                    value = 100;
+                } else if (tableName.equals("t2")) {
+                    value = 100L;
+                }
+
+                // base records
+                writeData(
+                        rowData(1, value, 15, 
BinaryString.fromString("20221208")),
+                        rowData(1, value, 16, 
BinaryString.fromString("20221208")),
+                        rowData(1, value, 15, 
BinaryString.fromString("20221209")));
+
+                Snapshot snapshot = 
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+                assertThat(snapshot.id()).isEqualTo(1);
+                
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+
+                // no full compaction has happened, so plan should be empty
+                StreamTableScan scan = table.newReadBuilder().newStreamScan();
+                TableScan.Plan plan = scan.plan();
+                assertThat(plan.splits()).isEmpty();
+            }
+        }
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+        
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+        env.getCheckpointConfig().setCheckpointInterval(500);
+        env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
+        new CompactDatabaseAction(warehouse, "db1|db2", null, null, new 
HashMap<>()).build(env);
+        JobClient client = env.executeAsync();
+
+        for (FileStoreTable table : tables) {
+            StreamTableScan scan = table.newReadBuilder().newStreamScan();
+            // first full compaction
+            validateResult(
+                    table,
+                    ROW_TYPE_MAP.get(table.name()),
+                    scan,
+                    Arrays.asList(
+                            "+I[1, 100, 15, 20221208]",
+                            "+I[1, 100, 15, 20221209]",
+                            "+I[1, 100, 16, 20221208]"),
+                    60_000);
+
+            Object value = null;
+            String tName = table.name();
+            if (tName.equals("t1")) {
+                value = 101;
+            } else if (tName.equals("t2")) {
+                value = 101L;
+            }
+
+            snapshotManager = table.snapshotManager();
+            StreamWriteBuilder streamWriteBuilder =
+                    table.newStreamWriteBuilder().withCommitUser(commitUser);
+            write = streamWriteBuilder.newWrite();
+            commit = streamWriteBuilder.newCommit();
+
+            // incremental records
+            writeData(
+                    rowData(1, value, 15, BinaryString.fromString("20221208")),
+                    rowData(1, value, 16, BinaryString.fromString("20221208")),
+                    rowData(1, value, 15, 
BinaryString.fromString("20221209")));
+
+            // second full compaction
+            validateResult(
+                    table,
+                    ROW_TYPE_MAP.get(table.name()),
+                    scan,
+                    Arrays.asList(
+                            "+U[1, 101, 15, 20221208]",
+                            "+U[1, 101, 15, 20221209]",
+                            "+U[1, 101, 16, 20221208]",
+                            "-U[1, 100, 15, 20221208]",
+                            "-U[1, 100, 15, 20221209]",
+                            "-U[1, 100, 16, 20221208]"),
+                    60_000);
+
+            // assert dedicated compact job will expire snapshots
+            CommonTestUtils.waitUtil(
+                    () ->
+                            snapshotManager.latestSnapshotId() - 2
+                                    == snapshotManager.earliestSnapshotId(),
+                    Duration.ofSeconds(60_000),
+                    Duration.ofSeconds(100),
+                    String.format(
+                            "Cannot validate snapshot expiration in %s 
milliseconds.", 60_000));
+        }
+
+        client.cancel();
+    }
+
+    @Test
+    public void testUnawareBucketStreamingCompact() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s");
+        // test that dedicated compact job will expire snapshots
+        options.put(CoreOptions.BUCKET.key(), "-1");
+        options.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
+        options.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2");
+
+        List<FileStoreTable> tables = new ArrayList<>();
+        for (String tableName : TABLE_NAMES) {
+            FileStoreTable table =
+                    createTable(
+                            database,
+                            tableName,
+                            ROW_TYPE_MAP.get(tableName),
+                            Arrays.asList("k"),
+                            Collections.emptyList(),
+                            options);
+            tables.add(table);
+            snapshotManager = table.snapshotManager();
+            StreamWriteBuilder streamWriteBuilder =
+                    table.newStreamWriteBuilder().withCommitUser(commitUser);
+            write = streamWriteBuilder.newWrite();
+            commit = streamWriteBuilder.newCommit();
+
+            Object value = null;
+            if (tableName.equals("t1")) {
+                value = 100;
+            } else if (tableName.equals("t2")) {
+                value = 100L;
+            }
+
+            // base records
+            writeData(
+                    rowData(1, value, 15, BinaryString.fromString("20221208")),
+                    rowData(1, value, 16, BinaryString.fromString("20221208")),
+                    rowData(1, value, 15, 
BinaryString.fromString("20221209")));
+
+            writeData(
+                    rowData(1, value, 15, BinaryString.fromString("20221208")),
+                    rowData(1, value, 16, BinaryString.fromString("20221208")),
+                    rowData(1, value, 15, 
BinaryString.fromString("20221209")));
+
+            Snapshot snapshot = 
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+            assertThat(snapshot.id()).isEqualTo(2);
+            
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+        }
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+        
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+        env.getCheckpointConfig().setCheckpointInterval(500);
+        env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
+        new CompactDatabaseAction(warehouse, database, null, null, new 
HashMap<>()).build(env);
+        JobClient client = env.executeAsync();
+
+        for (FileStoreTable table : tables) {
+            FileStoreScan storeScan = table.store().newScan();
+
+            snapshotManager = table.snapshotManager();
+            StreamWriteBuilder streamWriteBuilder =
+                    table.newStreamWriteBuilder().withCommitUser(commitUser);
+            write = streamWriteBuilder.newWrite();
+            commit = streamWriteBuilder.newCommit();
+
+            // first compaction, snapshot will be 3
+            checkFileAndRowSize(storeScan, 3L, 30_000L, 1, 6);
+
+            Object value = null;
+            String tName = table.name();
+            if (tName.equals("t1")) {
+                value = 101;
+            } else if (tName.equals("t2")) {
+                value = 101L;
+            }
+
+            writeData(
+                    rowData(1, value, 15, BinaryString.fromString("20221208")),
+                    rowData(1, value, 16, BinaryString.fromString("20221208")),
+                    rowData(1, value, 15, 
BinaryString.fromString("20221209")));
+
+            // second compaction, snapshot will be 5
+            checkFileAndRowSize(storeScan, 5L, 30_000L, 1, 9);
+        }
+
+        client.cancel().get();
+    }
+
+    @Test
+    public void testUnawareBucketBatchCompact() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        // test that dedicated compact job will expire snapshots
+        options.put(CoreOptions.BUCKET.key(), "-1");
+        options.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
+        options.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2");
+
+        List<FileStoreTable> tables = new ArrayList<>();
+        for (String tableName : TABLE_NAMES) {
+            FileStoreTable table =
+                    createTable(
+                            database,
+                            tableName,
+                            ROW_TYPE_MAP.get(tableName),
+                            Collections.singletonList("k"),
+                            Collections.emptyList(),
+                            options);
+            tables.add(table);
+            snapshotManager = table.snapshotManager();
+            StreamWriteBuilder streamWriteBuilder =
+                    table.newStreamWriteBuilder().withCommitUser(commitUser);
+            write = streamWriteBuilder.newWrite();
+            commit = streamWriteBuilder.newCommit();
+
+            Object value = null;
+            if (tableName.equals("t1")) {
+                value = 100;
+            } else if (tableName.equals("t2")) {
+                value = 100L;
+            }
+
+            // base records
+            writeData(
+                    rowData(1, value, 15, BinaryString.fromString("20221208")),
+                    rowData(1, value, 16, BinaryString.fromString("20221208")),
+                    rowData(1, value, 15, 
BinaryString.fromString("20221209")));
+
+            writeData(
+                    rowData(1, value, 15, BinaryString.fromString("20221208")),
+                    rowData(1, value, 16, BinaryString.fromString("20221208")),
+                    rowData(1, value, 15, 
BinaryString.fromString("20221209")));
+
+            Snapshot snapshot = 
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+            assertThat(snapshot.id()).isEqualTo(2);
+            
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+        }
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
+        new CompactDatabaseAction(warehouse, database, null, null, new 
HashMap<>()).build(env);
+        env.execute();
+
+        for (FileStoreTable table : tables) {
+            FileStoreScan storeScan = table.store().newScan();
+            snapshotManager = table.snapshotManager();
+            // first compaction, snapshot will be 3.
+            checkFileAndRowSize(storeScan, 3L, 0L, 1, 6);
+        }
+    }
+}

Reply via email to