This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bcf8576b3 [flink] Support minor compact strategy for dedicated 
compaction action. (#4589)
4bcf8576b3 is described below

commit 4bcf8576b3a5af66adb47dba7b8427581d080aab
Author: HunterXHunter <[email protected]>
AuthorDate: Wed Nov 27 19:11:39 2024 +0800

    [flink] Support minor compact strategy for dedicated compaction action. 
(#4589)
---
 docs/content/flink/procedures.md                   |  15 +-
 docs/content/maintenance/dedicated-compaction.md   |   8 +
 docs/content/spark/procedures.md                   |   4 +-
 .../flink/procedure/CompactDatabaseProcedure.java  |  13 +-
 .../paimon/flink/procedure/CompactProcedure.java   |  16 +-
 .../ProcedurePositionalArgumentsITCase.java        |   8 +-
 .../apache/paimon/flink/action/ActionFactory.java  |   4 +
 .../apache/paimon/flink/action/CompactAction.java  |  17 +-
 .../paimon/flink/action/CompactActionFactory.java  |  24 ++-
 .../paimon/flink/action/CompactDatabaseAction.java |  47 ++---
 .../flink/action/CompactDatabaseActionFactory.java |  15 +-
 .../flink/procedure/CompactDatabaseProcedure.java  |  13 +-
 .../paimon/flink/procedure/CompactProcedure.java   |  13 +-
 .../flink/sink/CombinedTableCompactorSink.java     |  17 +-
 .../paimon/flink/sink/CompactorSinkBuilder.java    |  10 +-
 .../sink/MultiTablesStoreCompactOperator.java      |   6 +-
 .../paimon/flink/action/CompactActionITCase.java   |  35 ----
 .../flink/action/CompactActionITCaseBase.java      |  36 ++++
 .../flink/action/MinorCompactActionITCase.java     | 205 +++++++++++++++++++++
 .../flink/procedure/CompactProcedureITCase.java    | 112 +++++++++++
 .../paimon/flink/sink/CompactorSinkITCase.java     |  14 +-
 .../paimon/spark/procedure/CompactProcedure.java   |  35 +++-
 .../spark/procedure/CompactProcedureTestBase.scala |  50 +++++
 23 files changed, 622 insertions(+), 95 deletions(-)

diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 7e669a89d4..59b02f82bf 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -67,7 +67,8 @@ All available procedures are listed below.
             order_by => 'order_by', 
             options => 'options', 
             `where` => 'where', 
-            partition_idle_time => 'partition_idle_time') <br/><br/>
+            partition_idle_time => 'partition_idle_time',
+            compact_strategy => 'compact_strategy') <br/><br/>
          -- Use indexed argument<br/>
          CALL [catalog.]sys.compact('table') <br/><br/>
          CALL [catalog.]sys.compact('table', 'partitions') <br/><br/> 
@@ -76,6 +77,7 @@ All available procedures are listed below.
          CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 
'order_by', 'options') <br/><br/>
          CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 
'order_by', 'options', 'where') <br/><br/>
          CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 
'order_by', 'options', 'where', 'partition_idle_time') <br/><br/>
+         CALL [catalog.]sys.compact('table', 'partitions', 'order_strategy', 
'order_by', 'options', 'where', 'partition_idle_time', 'compact_strategy') 
<br/><br/>
       </td>
       <td>
          To compact a table. Arguments:
@@ -86,6 +88,7 @@ All available procedures are listed below.
             <li>options(optional): additional dynamic options of the 
table.</li>
             <li>where(optional): partition predicate(Can't be used together 
with "partitions"). Note: as where is a keyword,a pair of backticks need to add 
around like `where`.</li>
             <li>partition_idle_time(optional): this is used to do a full 
compaction for partition which had not received any new data for 
'partition_idle_time'. And only these partitions will be compacted. This 
argument can not be used with order compact.</li>
+            <li>compact_strategy(optional): this determines how to pick files 
to be merged, the default is determined by the runtime execution mode. 'full' 
strategy only supports batch mode. All files will be selected for merging. 
'minor' strategy: Pick the set of files that need to be merged based on 
specified conditions.</li>
       </td>
       <td>
          -- use partition filter <br/>
@@ -104,7 +107,8 @@ All available procedures are listed below.
             including_tables => 'includingTables', 
             excluding_tables => 'excludingTables', 
             table_options => 'tableOptions', 
-            partition_idle_time => 'partitionIdleTime') <br/><br/>
+            partition_idle_time => 'partitionIdleTime',
+            compact_strategy => 'compact_strategy') <br/><br/>
          -- Use indexed argument<br/>
          CALL [catalog.]sys.compact_database() <br/><br/>
          CALL [catalog.]sys.compact_database('includingDatabases') <br/><br/> 
@@ -112,7 +116,8 @@ All available procedures are listed below.
          CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 
'includingTables') <br/><br/> 
          CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 
'includingTables', 'excludingTables') <br/><br/>
          CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 
'includingTables', 'excludingTables', 'tableOptions') <br/><br/>
-         CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 
'includingTables', 'excludingTables', 'tableOptions', 'partitionIdleTime')
+         CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 
'includingTables', 'excludingTables', 'tableOptions', 
'partitionIdleTime')<br/><br/>
+         CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 
'includingTables', 'excludingTables', 'tableOptions', 'partitionIdleTime', 
'compact_strategy')<br/><br/>
       </td>
       <td>
          To compact databases. Arguments:
@@ -124,6 +129,7 @@ All available procedures are listed below.
             <li>excludingTables: to specify tables that are not compacted. You 
can use regular expression.</li>
             <li>tableOptions: additional dynamic options of the table.</li>
             <li>partition_idle_time: this is used to do a full compaction for 
partition which had not received any new data for 'partition_idle_time'. And 
only these partitions will be compacted.</li>
+            <li>compact_strategy(optional): this determines how to pick files 
to be merged, the default is determined by the runtime execution mode. 'full' 
strategy only supports batch mode. All files will be selected for merging. 
'minor' strategy: Pick the set of files that need to be merged based on 
specified conditions.</li>
       </td>
       <td>
          CALL sys.compact_database(
@@ -131,7 +137,8 @@ All available procedures are listed below.
             mode => 'combined', 
             including_tables => 'table_.*', 
             excluding_tables => 'ignore', 
-            table_options => 'sink.parallelism=4')
+            table_options => 'sink.parallelism=4',
+            compat_strategy => 'full')
       </td>
    </tr>
    <tr>
diff --git a/docs/content/maintenance/dedicated-compaction.md 
b/docs/content/maintenance/dedicated-compaction.md
index c0010bf9cc..63e0aa5e66 100644
--- a/docs/content/maintenance/dedicated-compaction.md
+++ b/docs/content/maintenance/dedicated-compaction.md
@@ -107,6 +107,7 @@ Run the following command to submit a compaction job for 
the table.
     --database <database-name> \ 
     --table <table-name> \
     [--partition <partition-name>] \
+    [--compact_strategy <minor / full>] \
     [--table_conf <table_conf>] \
     [--catalog_conf <paimon-catalog-conf> [--catalog_conf 
<paimon-catalog-conf> ...]]
 ```
@@ -123,10 +124,14 @@ Example: compact table
     --partition dt=20221126,hh=08 \
     --partition dt=20221127,hh=09 \
     --table_conf sink.parallelism=10 \
+    --compact_strategy minor \
     --catalog_conf s3.endpoint=https://****.com \
     --catalog_conf s3.access-key=***** \
     --catalog_conf s3.secret-key=*****
 ```
+* `--compact_strategy` Determines how to pick files to be merged, the default 
is determined by the runtime execution mode, streaming-mode use `minor` 
strategy and batch-mode use `full` strategy.
+  * `full` : Only supports batch mode. All files will be selected for merging.
+  * `minor` : Pick the set of files that need to be merged based on specified 
conditions.
 
 You can use `-D execution.runtime-mode=batch` or `-yD 
execution.runtime-mode=batch` (for the ON-YARN scenario) to control batch or 
streaming mode. If you submit a batch job, all
 current table files will be compacted. If you submit a streaming job, the job 
will continuously monitor new changes
@@ -190,6 +195,7 @@ CALL sys.compact_database(
     [--including_tables <paimon-table-name|name-regular-expr>] \
     [--excluding_tables <paimon-table-name|name-regular-expr>] \
     [--mode <compact-mode>] \
+    [--compact_strategy <minor / full>] \
     [--catalog_conf <paimon-catalog-conf> [--catalog_conf 
<paimon-catalog-conf> ...]] \
     [--table_conf <paimon-table_conf> [--table_conf <paimon-table_conf> ...]]
 ```
@@ -346,6 +352,7 @@ CALL sys.compact(`table` => 'default.T', 
'partition_idle_time' => '1 d')
     --table <table-name> \
     --partition_idle_time <partition-idle-time> \ 
     [--partition <partition-name>] \
+    [--compact_strategy <minor / full>] \
     [--catalog_conf <paimon-catalog-conf> [--catalog_conf 
<paimon-catalog-conf> ...]] \
     [--table_conf <paimon-table-dynamic-conf> [--table_conf 
<paimon-table-dynamic-conf>] ...]
 ```
@@ -406,6 +413,7 @@ CALL sys.compact_database(
     [--including_tables <paimon-table-name|name-regular-expr>] \
     [--excluding_tables <paimon-table-name|name-regular-expr>] \
     [--mode <compact-mode>] \
+    [--compact_strategy <minor / full>] \
     [--catalog_conf <paimon-catalog-conf> [--catalog_conf 
<paimon-catalog-conf> ...]] \
     [--table_conf <paimon-table_conf> [--table_conf <paimon-table_conf> ...]]
 ```
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index 1f3f554106..88d46fabbb 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -47,12 +47,14 @@ This section introduce all available spark procedures about 
paimon.
             <li>order_strategy: 'order' or 'zorder' or 'hilbert' or 'none'. 
Left empty for 'none'.</li>
             <li>order_columns: the columns need to be sort. Left empty if 
'order_strategy' is 'none'.</li>
             <li>partition_idle_time: this is used to do a full compaction for 
partition which had not received any new data for 'partition_idle_time'. And 
only these partitions will be compacted. This argument can not be used with 
order compact.</li>
+            <li>compact_strategy: this determines how to pick files to be 
merged, the default is determined by the runtime execution mode. 'full' 
strategy only supports batch mode. All files will be selected for merging. 
'minor' strategy: Pick the set of files that need to be merged based on 
specified conditions.</li>
       </td>
       <td>
          SET spark.sql.shuffle.partitions=10; --set the compact parallelism 
<br/><br/>
          CALL sys.compact(table => 'T', partitions => 'p=0;p=1',  
order_strategy => 'zorder', order_by => 'a,b') <br/><br/>
          CALL sys.compact(table => 'T', where => 'p>0 and p<3', order_strategy 
=> 'zorder', order_by => 'a,b') <br/><br/>
-         CALL sys.compact(table => 'T', partition_idle_time => '60s')
+         CALL sys.compact(table => 'T', partition_idle_time => '60s')<br/><br/>
+         CALL sys.compact(table => 'T', compact_strategy => 'minor')<br/><br/>
       </td>
     </tr>
     <tr>
diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
index 99f205bacb..ac4340c113 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
@@ -26,6 +26,8 @@ import org.apache.flink.table.procedure.ProcedureContext;
 
 import java.util.Map;
 
+import static org.apache.paimon.flink.action.ActionFactory.FULL;
+import static 
org.apache.paimon.flink.action.CompactActionFactory.checkCompactStrategy;
 import static 
org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValues;
 
 /**
@@ -51,6 +53,7 @@ import static 
org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValue
  *
  *  -- set table options ('k=v,...')
  *  CALL sys.compact_database('includingDatabases', 'mode', 'includingTables', 
'excludingTables', 'tableOptions')
+ *
  * </code></pre>
  */
 public class CompactDatabaseProcedure extends ProcedureBase {
@@ -106,7 +109,8 @@ public class CompactDatabaseProcedure extends ProcedureBase 
{
                 includingTables,
                 excludingTables,
                 tableOptions,
-                "");
+                "",
+                null);
     }
 
     public String[] call(
@@ -116,7 +120,8 @@ public class CompactDatabaseProcedure extends ProcedureBase 
{
             String includingTables,
             String excludingTables,
             String tableOptions,
-            String partitionIdleTime)
+            String partitionIdleTime,
+            String compactStrategy)
             throws Exception {
         String warehouse = catalog.warehouse();
         Map<String, String> catalogOptions = catalog.options();
@@ -133,6 +138,10 @@ public class CompactDatabaseProcedure extends 
ProcedureBase {
             
action.withPartitionIdleTime(TimeUtils.parseDuration(partitionIdleTime));
         }
 
+        if (checkCompactStrategy(compactStrategy)) {
+            
action.withFullCompaction(compactStrategy.trim().equalsIgnoreCase(FULL));
+        }
+
         return execute(procedureContext, action, "Compact database job");
     }
 
diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
index 63aa6c906b..560e532a6d 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
@@ -31,6 +31,9 @@ import org.apache.flink.table.procedure.ProcedureContext;
 import java.util.Collections;
 import java.util.Map;
 
+import static org.apache.paimon.flink.action.ActionFactory.FULL;
+import static 
org.apache.paimon.flink.action.CompactActionFactory.checkCompactStrategy;
+
 /**
  * Stay compatible with 1.18 procedure which doesn't support named argument. 
Usage:
  *
@@ -49,6 +52,9 @@ import java.util.Map;
  *  -- compact specific partitions with sorting
  *  CALL sys.compact('tableId', 'partitions', 'ORDER/ZORDER', 'col1,col2', 
'sink.parallelism=6')
  *
+ *  -- compact with specific compact strategy
+ *  CALL sys.compact('tableId', 'partitions', 'ORDER/ZORDER', 'col1,col2', 
'sink.parallelism=6', 'minor')
+ *
  * </code></pre>
  */
 public class CompactProcedure extends ProcedureBase {
@@ -118,7 +124,8 @@ public class CompactProcedure extends ProcedureBase {
                 orderByColumns,
                 tableOptions,
                 whereSql,
-                "");
+                "",
+                null);
     }
 
     public String[] call(
@@ -129,7 +136,8 @@ public class CompactProcedure extends ProcedureBase {
             String orderByColumns,
             String tableOptions,
             String whereSql,
-            String partitionIdleTime)
+            String partitionIdleTime,
+            String compactStrategy)
             throws Exception {
 
         String warehouse = catalog.warehouse();
@@ -152,6 +160,10 @@ public class CompactProcedure extends ProcedureBase {
             if (!(StringUtils.isNullOrWhitespaceOnly(partitionIdleTime))) {
                 
action.withPartitionIdleTime(TimeUtils.parseDuration(partitionIdleTime));
             }
+
+            if (checkCompactStrategy(compactStrategy)) {
+                
action.withFullCompaction(compactStrategy.trim().equalsIgnoreCase(FULL));
+            }
             jobName = "Compact Job";
         } else if (!orderStrategy.isEmpty() && !orderByColumns.isEmpty()) {
             Preconditions.checkArgument(
diff --git 
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
 
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
index f2385e66d2..f79d6fb716 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
@@ -68,10 +68,16 @@ public class ProcedurePositionalArgumentsITCase extends 
CatalogITCaseBase {
                                 sql(
                                         "CALL sys.compact('default.T', '', '', 
'', 'sink.parallelism=1','pt=1')"))
                 .doesNotThrowAnyException();
-        assertThatCode(() -> sql("CALL sys.compact('default.T', '', 'zorder', 
'k', '','','5s')"))
+        assertThatCode(
+                        () ->
+                                sql(
+                                        "CALL sys.compact('default.T', '' 
,'zorder', 'k', '','','5s', '')"))
                 .message()
                 .contains("sort compact do not support 
'partition_idle_time'.");
 
+        assertThatCode(() -> sql("CALL sys.compact('default.T', '', '' ,'', 
'', '', '', 'full')"))
+                .doesNotThrowAnyException();
+
         assertThatCode(() -> sql("CALL sys.compact_database('default')"))
                 .doesNotThrowAnyException();
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
index 43719f715d..fbf8f12f49 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
@@ -58,6 +58,10 @@ public interface ActionFactory extends Factory {
     String TIMESTAMPFORMATTER = "timestamp_formatter";
     String EXPIRE_STRATEGY = "expire_strategy";
     String TIMESTAMP_PATTERN = "timestamp_pattern";
+    // Supports `full` and `minor`.
+    String COMPACT_STRATEGY = "compact_strategy";
+    String MINOR = "minor";
+    String FULL = "full";
 
     Optional<Action> create(MultipleParameterToolAdapter params);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index ce88857f1b..84e37a5b10 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -59,6 +59,8 @@ public class CompactAction extends TableActionBase {
 
     @Nullable private Duration partitionIdleTime = null;
 
+    private Boolean fullCompaction;
+
     public CompactAction(String warehouse, String database, String tableName) {
         this(warehouse, database, tableName, Collections.emptyMap(), 
Collections.emptyMap());
     }
@@ -100,6 +102,11 @@ public class CompactAction extends TableActionBase {
         return this;
     }
 
+    public CompactAction withFullCompaction(Boolean fullCompaction) {
+        this.fullCompaction = fullCompaction;
+        return this;
+    }
+
     @Override
     public void build() throws Exception {
         ReadableConfig conf = env.getConfiguration();
@@ -124,6 +131,13 @@ public class CompactAction extends TableActionBase {
     private void buildForTraditionalCompaction(
             StreamExecutionEnvironment env, FileStoreTable table, boolean 
isStreaming)
             throws Exception {
+        if (fullCompaction == null) {
+            fullCompaction = !isStreaming;
+        } else {
+            Preconditions.checkArgument(
+                    !(fullCompaction && isStreaming),
+                    "The full compact strategy is only supported in batch 
mode. Please add -Dexecution.runtime-mode=BATCH.");
+        }
         if (isStreaming) {
             // for completely asynchronous compaction
             HashMap<String, String> dynamicOptions =
@@ -138,8 +152,7 @@ public class CompactAction extends TableActionBase {
         }
         CompactorSourceBuilder sourceBuilder =
                 new CompactorSourceBuilder(identifier.getFullName(), table);
-        CompactorSinkBuilder sinkBuilder =
-                new 
CompactorSinkBuilder(table).withFullCompaction(!isStreaming);
+        CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table, 
fullCompaction);
 
         sourceBuilder.withPartitionPredicate(getPredicate());
         DataStreamSource<RowData> source =
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
index f43c7a747c..fc60a870ea 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
@@ -76,6 +76,10 @@ public class CompactActionFactory implements ActionFactory {
                 action.withPartitionIdleTime(
                         
TimeUtils.parseDuration(params.get(PARTITION_IDLE_TIME)));
             }
+            String compactStrategy = params.get(COMPACT_STRATEGY);
+            if (checkCompactStrategy(compactStrategy)) {
+                
action.withFullCompaction(compactStrategy.trim().equalsIgnoreCase(FULL));
+            }
         }
 
         if (params.has(PARTITION)) {
@@ -88,6 +92,19 @@ public class CompactActionFactory implements ActionFactory {
         return Optional.of(action);
     }
 
+    public static boolean checkCompactStrategy(String compactStrategy) {
+        if (compactStrategy != null) {
+            Preconditions.checkArgument(
+                    compactStrategy.equalsIgnoreCase(MINOR)
+                            || compactStrategy.equalsIgnoreCase(FULL),
+                    String.format(
+                            "The compact strategy only supports 'full' or 
'minor', but '%s' is configured.",
+                            compactStrategy));
+            return true;
+        }
+        return false;
+    }
+
     @Override
     public void printHelp() {
         System.out.println(
@@ -101,7 +118,8 @@ public class CompactActionFactory implements ActionFactory {
                         + "[--order_strategy <order_strategy>]"
                         + "[--table_conf <key>=<value>]"
                         + "[--order_by <order_columns>]"
-                        + "[--partition_idle_time <partition_idle_time>]");
+                        + "[--partition_idle_time <partition_idle_time>]"
+                        + "[--compact_strategy <compact_strategy>]");
         System.out.println(
                 "  compact --warehouse s3://path/to/warehouse --database 
<database_name> "
                         + "--table <table_name> [--catalog_conf 
<paimon_catalog_conf> [--catalog_conf <paimon_catalog_conf> ...]]");
@@ -132,6 +150,10 @@ public class CompactActionFactory implements ActionFactory 
{
         System.out.println(
                 "  compact --warehouse hdfs:///path/to/warehouse --database 
test_db --table test_table "
                         + "--partition_idle_time 10s");
+        System.out.println(
+                "--compact_strategy determines how to pick files to be merged, 
the default is determined by the runtime execution mode. "
+                        + "`full` : Only supports batch mode. All files will 
be selected for merging."
+                        + "`minor`: Pick the set of files that need to be 
merged based on specified conditions.");
         System.out.println(
                 "  compact --warehouse s3:///path/to/warehouse "
                         + "--database test_db "
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
index 471c6fdd4d..124d3ca687 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
@@ -72,6 +72,10 @@ public class CompactDatabaseAction extends ActionBase {
 
     @Nullable private Duration partitionIdleTime = null;
 
+    private Boolean fullCompaction;
+
+    private boolean isStreaming;
+
     public CompactDatabaseAction(String warehouse, Map<String, String> 
catalogConfig) {
         super(warehouse, catalogConfig);
     }
@@ -110,6 +114,11 @@ public class CompactDatabaseAction extends ActionBase {
         return this;
     }
 
+    public CompactDatabaseAction withFullCompaction(boolean fullCompaction) {
+        this.fullCompaction = fullCompaction;
+        return this;
+    }
+
     private boolean shouldCompactionTable(String paimonFullTableName) {
         boolean shouldCompaction = 
includingPattern.matcher(paimonFullTableName).matches();
         if (excludingPattern != null) {
@@ -124,6 +133,12 @@ public class CompactDatabaseAction extends ActionBase {
 
     @Override
     public void build() {
+        ReadableConfig conf = env.getConfiguration();
+        isStreaming = conf.get(ExecutionOptions.RUNTIME_MODE) == 
RuntimeExecutionMode.STREAMING;
+
+        if (fullCompaction == null) {
+            fullCompaction = !isStreaming;
+        }
         if (databaseCompactMode == MultiTablesSinkMode.DIVIDED) {
             buildForDividedMode();
         } else {
@@ -170,24 +185,19 @@ public class CompactDatabaseAction extends ActionBase {
                 !tableMap.isEmpty(),
                 "no tables to be compacted. possible cause is that there are 
no tables detected after pattern matching");
 
-        ReadableConfig conf = env.getConfiguration();
-        boolean isStreaming =
-                conf.get(ExecutionOptions.RUNTIME_MODE) == 
RuntimeExecutionMode.STREAMING;
         for (Map.Entry<String, FileStoreTable> entry : tableMap.entrySet()) {
             FileStoreTable fileStoreTable = entry.getValue();
             switch (fileStoreTable.bucketMode()) {
                 case BUCKET_UNAWARE:
                     {
-                        buildForUnawareBucketCompaction(
-                                env, entry.getKey(), fileStoreTable, 
isStreaming);
+                        buildForUnawareBucketCompaction(env, entry.getKey(), 
fileStoreTable);
                         break;
                     }
                 case HASH_FIXED:
                 case HASH_DYNAMIC:
                 default:
                     {
-                        buildForTraditionalCompaction(
-                                env, entry.getKey(), fileStoreTable, 
isStreaming);
+                        buildForTraditionalCompaction(env, entry.getKey(), 
fileStoreTable);
                     }
             }
         }
@@ -195,9 +205,6 @@ public class CompactDatabaseAction extends ActionBase {
 
     private void buildForCombinedMode() {
 
-        ReadableConfig conf = env.getConfiguration();
-        boolean isStreaming =
-                conf.get(ExecutionOptions.RUNTIME_MODE) == 
RuntimeExecutionMode.STREAMING;
         CombinedTableCompactorSourceBuilder sourceBuilder =
                 new CombinedTableCompactorSourceBuilder(
                                 catalogLoader(),
@@ -234,15 +241,17 @@ public class CompactDatabaseAction extends ActionBase {
                                 .buildForUnawareBucketsTableSource(),
                         parallelism);
 
-        new CombinedTableCompactorSink(catalogLoader(), tableOptions)
+        new CombinedTableCompactorSink(catalogLoader(), tableOptions, 
fullCompaction)
                 .sinkFrom(awareBucketTableSource, unawareBucketTableSource);
     }
 
     private void buildForTraditionalCompaction(
-            StreamExecutionEnvironment env,
-            String fullName,
-            FileStoreTable table,
-            boolean isStreaming) {
+            StreamExecutionEnvironment env, String fullName, FileStoreTable 
table) {
+
+        Preconditions.checkArgument(
+                !(fullCompaction && isStreaming),
+                "The full compact strategy is only supported in batch mode. 
Please add -Dexecution.runtime-mode=BATCH.");
+
         if (isStreaming) {
             // for completely asynchronous compaction
             HashMap<String, String> dynamicOptions =
@@ -259,8 +268,7 @@ public class CompactDatabaseAction extends ActionBase {
         CompactorSourceBuilder sourceBuilder =
                 new CompactorSourceBuilder(fullName, table)
                         .withPartitionIdleTime(partitionIdleTime);
-        CompactorSinkBuilder sinkBuilder =
-                new 
CompactorSinkBuilder(table).withFullCompaction(!isStreaming);
+        CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table, 
fullCompaction);
 
         DataStreamSource<RowData> source =
                 
sourceBuilder.withEnv(env).withContinuousMode(isStreaming).build();
@@ -268,10 +276,7 @@ public class CompactDatabaseAction extends ActionBase {
     }
 
     private void buildForUnawareBucketCompaction(
-            StreamExecutionEnvironment env,
-            String fullName,
-            FileStoreTable table,
-            boolean isStreaming) {
+            StreamExecutionEnvironment env, String fullName, FileStoreTable 
table) {
         UnawareBucketCompactionTopoBuilder unawareBucketCompactionTopoBuilder =
                 new UnawareBucketCompactionTopoBuilder(env, fullName, table);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java
index b268709078..5672f99dc3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java
@@ -22,6 +22,8 @@ import org.apache.paimon.utils.TimeUtils;
 
 import java.util.Optional;
 
+import static 
org.apache.paimon.flink.action.CompactActionFactory.checkCompactStrategy;
+
 /** Factory to create {@link CompactDatabaseAction}. */
 public class CompactDatabaseActionFactory implements ActionFactory {
 
@@ -55,6 +57,11 @@ public class CompactDatabaseActionFactory implements 
ActionFactory {
             
action.withPartitionIdleTime(TimeUtils.parseDuration(partitionIdleTime));
         }
 
+        String compactStrategy = params.get(COMPACT_STRATEGY);
+        if (checkCompactStrategy(compactStrategy)) {
+            
action.withFullCompaction(compactStrategy.trim().equalsIgnoreCase(FULL));
+        }
+
         return Optional.of(action);
     }
 
@@ -70,7 +77,8 @@ public class CompactDatabaseActionFactory implements 
ActionFactory {
                         + "[--including_tables 
<paimon_table_name|name_regular_expr>] "
                         + "[--excluding_tables 
<paimon_table_name|name_regular_expr>] "
                         + "[--mode <compact_mode>]"
-                        + "[--partition_idle_time <partition_idle_time>]");
+                        + "[--partition_idle_time <partition_idle_time>]"
+                        + "[--compact_strategy <compact_strategy>]");
         System.out.println(
                 "  compact_database --warehouse s3://path/to/warehouse 
--including_databases <database-name|name-regular-expr> "
                         + "[--catalog_conf <paimon_catalog_conf> 
[--catalog_conf <paimon_catalog_conf> ...]]");
@@ -93,6 +101,11 @@ public class CompactDatabaseActionFactory implements 
ActionFactory {
         System.out.println(
                 "--partition_idle_time is used to do a full compaction for 
partition which had not receive any new data for 'partition_idle_time' time. 
And only these partitions will be compacted.");
         System.out.println("--partition_idle_time is only supported in batch 
mode. ");
+        System.out.println(
+                "--compact_strategy determines how to pick files to be merged, 
the default is determined by the runtime execution mode. "
+                        + "`full` : Only supports batch mode. All files will 
be selected for merging."
+                        + "`minor`: Pick the set of files that need to be 
merged based on specified conditions.");
+
         System.out.println();
 
         System.out.println("Examples:");
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
index dd71e974c7..80602b755a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
@@ -29,6 +29,8 @@ import org.apache.flink.table.procedure.ProcedureContext;
 
 import java.util.Map;
 
+import static org.apache.paimon.flink.action.ActionFactory.FULL;
+import static 
org.apache.paimon.flink.action.CompactActionFactory.checkCompactStrategy;
 import static 
org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValues;
 
 /**
@@ -82,6 +84,10 @@ public class CompactDatabaseProcedure extends ProcedureBase {
                 @ArgumentHint(
                         name = "partition_idle_time",
                         type = @DataTypeHint("STRING"),
+                        isOptional = true),
+                @ArgumentHint(
+                        name = "compact_strategy",
+                        type = @DataTypeHint("STRING"),
                         isOptional = true)
             })
     public String[] call(
@@ -91,7 +97,8 @@ public class CompactDatabaseProcedure extends ProcedureBase {
             String includingTables,
             String excludingTables,
             String tableOptions,
-            String partitionIdleTime)
+            String partitionIdleTime,
+            String compactStrategy)
             throws Exception {
         partitionIdleTime = notnull(partitionIdleTime);
         String warehouse = catalog.warehouse();
@@ -109,6 +116,10 @@ public class CompactDatabaseProcedure extends 
ProcedureBase {
             
action.withPartitionIdleTime(TimeUtils.parseDuration(partitionIdleTime));
         }
 
+        if (checkCompactStrategy(compactStrategy)) {
+            
action.withFullCompaction(compactStrategy.trim().equalsIgnoreCase(FULL));
+        }
+
         return execute(procedureContext, action, "Compact database job");
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
index 8589069126..282f5af340 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
@@ -32,6 +32,8 @@ import org.apache.flink.table.procedure.ProcedureContext;
 import java.util.Collections;
 import java.util.Map;
 
+import static org.apache.paimon.flink.action.ActionFactory.FULL;
+import static 
org.apache.paimon.flink.action.CompactActionFactory.checkCompactStrategy;
 import static org.apache.paimon.utils.ParameterUtils.getPartitions;
 import static 
org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValues;
 import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;
@@ -58,6 +60,10 @@ public class CompactProcedure extends ProcedureBase {
                 @ArgumentHint(
                         name = "partition_idle_time",
                         type = @DataTypeHint("STRING"),
+                        isOptional = true),
+                @ArgumentHint(
+                        name = "compact_strategy",
+                        type = @DataTypeHint("STRING"),
                         isOptional = true)
             })
     public String[] call(
@@ -68,7 +74,8 @@ public class CompactProcedure extends ProcedureBase {
             String orderByColumns,
             String tableOptions,
             String where,
-            String partitionIdleTime)
+            String partitionIdleTime,
+            String compactStrategy)
             throws Exception {
         String warehouse = catalog.warehouse();
         Map<String, String> catalogOptions = catalog.options();
@@ -90,6 +97,10 @@ public class CompactProcedure extends ProcedureBase {
             if (!isNullOrWhitespaceOnly(partitionIdleTime)) {
                 
action.withPartitionIdleTime(TimeUtils.parseDuration(partitionIdleTime));
             }
+
+            if (checkCompactStrategy(compactStrategy)) {
+                
action.withFullCompaction(compactStrategy.trim().equalsIgnoreCase(FULL));
+            }
             jobName = "Compact Job";
         } else if (!isNullOrWhitespaceOnly(orderStrategy)
                 && !isNullOrWhitespaceOnly(orderByColumns)) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
index 87a28091fa..ce4e373059 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
@@ -57,11 +57,15 @@ public class CombinedTableCompactorSink implements 
Serializable {
 
     private final Catalog.Loader catalogLoader;
     private final boolean ignorePreviousFiles;
+    private final boolean fullCompaction;
+
     private final Options options;
 
-    public CombinedTableCompactorSink(Catalog.Loader catalogLoader, Options 
options) {
+    public CombinedTableCompactorSink(
+            Catalog.Loader catalogLoader, Options options, boolean 
fullCompaction) {
         this.catalogLoader = catalogLoader;
         this.ignorePreviousFiles = false;
+        this.fullCompaction = fullCompaction;
         this.options = options;
     }
 
@@ -104,7 +108,10 @@ public class CombinedTableCompactorSink implements 
Serializable {
                                 String.format("%s-%s", "Multi-Bucket-Table", 
WRITER_NAME),
                                 new MultiTableCommittableTypeInfo(),
                                 combinedMultiComacptionWriteOperator(
-                                        env.getCheckpointConfig(), 
isStreaming, commitUser))
+                                        env.getCheckpointConfig(),
+                                        isStreaming,
+                                        fullCompaction,
+                                        commitUser))
                         
.setParallelism(awareBucketTableSource.getParallelism());
 
         SingleOutputStreamOperator<MultiTableCommittable> 
unawareBucketTableRewriter =
@@ -168,13 +175,17 @@ public class CombinedTableCompactorSink implements 
Serializable {
     // TODO:refactor FlinkSink to adopt this sink
     protected OneInputStreamOperator<RowData, MultiTableCommittable>
             combinedMultiComacptionWriteOperator(
-                    CheckpointConfig checkpointConfig, boolean isStreaming, 
String commitUser) {
+                    CheckpointConfig checkpointConfig,
+                    boolean isStreaming,
+                    boolean fullCompaction,
+                    String commitUser) {
         return new MultiTablesStoreCompactOperator(
                 catalogLoader,
                 commitUser,
                 checkpointConfig,
                 isStreaming,
                 ignorePreviousFiles,
+                fullCompaction,
                 options);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java
index 2173b1d34a..2d84ae6726 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java
@@ -37,10 +37,11 @@ public class CompactorSinkBuilder {
 
     private DataStream<RowData> input;
 
-    private boolean fullCompaction;
+    private final boolean fullCompaction;
 
-    public CompactorSinkBuilder(FileStoreTable table) {
+    public CompactorSinkBuilder(FileStoreTable table, boolean fullCompaction) {
         this.table = table;
+        this.fullCompaction = fullCompaction;
     }
 
     public CompactorSinkBuilder withInput(DataStream<RowData> input) {
@@ -48,11 +49,6 @@ public class CompactorSinkBuilder {
         return this;
     }
 
-    public CompactorSinkBuilder withFullCompaction(boolean fullCompaction) {
-        this.fullCompaction = fullCompaction;
-        return this;
-    }
-
     public DataStreamSink<?> build() {
         BucketMode bucketMode = table.bucketMode();
         switch (bucketMode) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
index 8a1d3a02df..57d2e8413c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
@@ -63,6 +63,7 @@ public class MultiTablesStoreCompactOperator
     private final CheckpointConfig checkpointConfig;
     private final boolean isStreaming;
     private final boolean ignorePreviousFiles;
+    private final boolean fullCompaction;
     private final String initialCommitUser;
 
     private transient StoreSinkWriteState state;
@@ -81,6 +82,7 @@ public class MultiTablesStoreCompactOperator
             CheckpointConfig checkpointConfig,
             boolean isStreaming,
             boolean ignorePreviousFiles,
+            boolean fullCompaction,
             Options options) {
         super(options);
         this.catalogLoader = catalogLoader;
@@ -88,6 +90,7 @@ public class MultiTablesStoreCompactOperator
         this.checkpointConfig = checkpointConfig;
         this.isStreaming = isStreaming;
         this.ignorePreviousFiles = ignorePreviousFiles;
+        this.fullCompaction = fullCompaction;
     }
 
     @Override
@@ -162,13 +165,14 @@ public class MultiTablesStoreCompactOperator
 
         if (write.streamingMode()) {
             write.notifyNewFiles(snapshotId, partition, bucket, files);
+            // The full compact is not supported in streaming mode.
             write.compact(partition, bucket, false);
         } else {
             Preconditions.checkArgument(
                     files.isEmpty(),
                     "Batch compact job does not concern what files are 
compacted. "
                             + "They only need to know what buckets are 
compacted.");
-            write.compact(partition, bucket, true);
+            write.compact(partition, bucket, fullCompaction);
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index bc849f0a13..2c4fb64f33 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -23,13 +23,9 @@ import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.table.source.TableScan;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.CommonTestUtils;
 import org.apache.paimon.utils.SnapshotManager;
 
@@ -56,12 +52,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** IT cases for {@link CompactAction}. */
 public class CompactActionITCase extends CompactActionITCaseBase {
 
-    private static final DataType[] FIELD_TYPES =
-            new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), 
DataTypes.STRING()};
-
-    private static final RowType ROW_TYPE =
-            RowType.of(FIELD_TYPES, new String[] {"k", "v", "hh", "dt"});
-
     @Test
     @Timeout(60)
     public void testBatchCompact() throws Exception {
@@ -402,31 +392,6 @@ public class CompactActionITCase extends 
CompactActionITCaseBase {
                 .hasMessage("sort compact do not support 
'partition_idle_time'.");
     }
 
-    private FileStoreTable prepareTable(
-            List<String> partitionKeys,
-            List<String> primaryKeys,
-            List<String> bucketKey,
-            Map<String, String> tableOptions)
-            throws Exception {
-        FileStoreTable table =
-                createFileStoreTable(ROW_TYPE, partitionKeys, primaryKeys, 
bucketKey, tableOptions);
-
-        StreamWriteBuilder streamWriteBuilder =
-                table.newStreamWriteBuilder().withCommitUser(commitUser);
-        write = streamWriteBuilder.newWrite();
-        commit = streamWriteBuilder.newCommit();
-
-        return table;
-    }
-
-    private void checkLatestSnapshot(
-            FileStoreTable table, long snapshotId, Snapshot.CommitKind 
commitKind) {
-        SnapshotManager snapshotManager = table.snapshotManager();
-        Snapshot snapshot = 
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
-        assertThat(snapshot.id()).isEqualTo(snapshotId);
-        assertThat(snapshot.commitKind()).isEqualTo(commitKind);
-    }
-
     private void runAction(boolean isStreaming) throws Exception {
         runAction(isStreaming, false);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java
index 4c646444cb..41d01bdf7f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java
@@ -18,17 +18,22 @@
 
 package org.apache.paimon.flink.action;
 
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.SnapshotManager;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeoutException;
 
@@ -37,6 +42,12 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Base IT cases for {@link CompactAction} and {@link CompactDatabaseAction} 
. */
 public class CompactActionITCaseBase extends ActionITCaseBase {
 
+    protected static final DataType[] FIELD_TYPES =
+            new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), 
DataTypes.STRING()};
+
+    protected static final RowType ROW_TYPE =
+            RowType.of(FIELD_TYPES, new String[] {"k", "v", "hh", "dt"});
+
     protected void validateResult(
             FileStoreTable table,
             RowType rowType,
@@ -87,4 +98,29 @@ public class CompactActionITCaseBase extends 
ActionITCaseBase {
         assertThat(files.size()).isEqualTo(fileNum);
         assertThat(count).isEqualTo(rowCount);
     }
+
+    protected void checkLatestSnapshot(
+            FileStoreTable table, long snapshotId, Snapshot.CommitKind 
commitKind) {
+        SnapshotManager snapshotManager = table.snapshotManager();
+        Snapshot snapshot = 
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+        assertThat(snapshot.id()).isEqualTo(snapshotId);
+        assertThat(snapshot.commitKind()).isEqualTo(commitKind);
+    }
+
+    protected FileStoreTable prepareTable(
+            List<String> partitionKeys,
+            List<String> primaryKeys,
+            List<String> bucketKey,
+            Map<String, String> tableOptions)
+            throws Exception {
+        FileStoreTable table =
+                createFileStoreTable(ROW_TYPE, partitionKeys, primaryKeys, 
bucketKey, tableOptions);
+
+        StreamWriteBuilder streamWriteBuilder =
+                table.newStreamWriteBuilder().withCommitUser(commitUser);
+        write = streamWriteBuilder.newWrite();
+        commit = streamWriteBuilder.newCommit();
+
+        return table;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MinorCompactActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MinorCompactActionITCase.java
new file mode 100644
index 0000000000..0373eb01a2
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MinorCompactActionITCase.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for compact strategy {@link CompactAction}. */
+public class MinorCompactActionITCase extends CompactActionITCaseBase {
+
+    @Test
+    @Timeout(60)
+    public void testBatchMinorCompactStrategy() throws Exception {
+        FileStoreTable table =
+                prepareTable(
+                        Arrays.asList("dt", "hh"),
+                        Arrays.asList("dt", "hh", "k"),
+                        Collections.emptyList(),
+                        Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), 
"true"));
+
+        writeData(
+                rowData(1, 100, 15, BinaryString.fromString("20221208")),
+                rowData(1, 100, 16, BinaryString.fromString("20221208")));
+
+        writeData(
+                rowData(2, 100, 15, BinaryString.fromString("20221208")),
+                rowData(2, 100, 16, BinaryString.fromString("20221208")));
+
+        checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);
+
+        CompactAction action =
+                createAction(
+                        CompactAction.class,
+                        "compact",
+                        "--warehouse",
+                        warehouse,
+                        "--database",
+                        database,
+                        "--table",
+                        tableName,
+                        "--compact_strategy",
+                        "minor",
+                        "--table_conf",
+                        CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER.key() + 
"=3");
+        StreamExecutionEnvironment env = 
streamExecutionEnvironmentBuilder().batchMode().build();
+        action.withStreamExecutionEnvironment(env).build();
+        env.execute();
+
+        // Due to the limitation of parameter 
'num-sorted-run.compaction-trigger', so compact is not
+        // performed.
+        checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);
+
+        // Make par-15 has 3 datafile and par-16 has 2 datafile, so par-16 
will not be picked out to
+        // compact.
+        writeData(rowData(2, 100, 15, BinaryString.fromString("20221208")));
+
+        env = streamExecutionEnvironmentBuilder().batchMode().build();
+        action.withStreamExecutionEnvironment(env).build();
+        env.execute();
+
+        checkLatestSnapshot(table, 4, Snapshot.CommitKind.COMPACT);
+
+        List<DataSplit> splits = table.newSnapshotReader().read().dataSplits();
+        assertThat(splits.size()).isEqualTo(2);
+        for (DataSplit split : splits) {
+            // Par-16 is not compacted.
+            assertThat(split.dataFiles().size())
+                    .isEqualTo(split.partition().getInt(1) == 16 ? 2 : 1);
+        }
+    }
+
+    @Test
+    @Timeout(60)
+    public void testBatchFullCompactStrategy() throws Exception {
+        FileStoreTable table =
+                prepareTable(
+                        Arrays.asList("dt", "hh"),
+                        Arrays.asList("dt", "hh", "k"),
+                        Collections.emptyList(),
+                        Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), 
"true"));
+
+        writeData(
+                rowData(1, 100, 15, BinaryString.fromString("20221208")),
+                rowData(1, 100, 16, BinaryString.fromString("20221208")));
+
+        writeData(
+                rowData(2, 100, 15, BinaryString.fromString("20221208")),
+                rowData(2, 100, 16, BinaryString.fromString("20221208")));
+
+        checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);
+
+        CompactAction action =
+                createAction(
+                        CompactAction.class,
+                        "compact",
+                        "--warehouse",
+                        warehouse,
+                        "--database",
+                        database,
+                        "--table",
+                        tableName,
+                        "--compact_strategy",
+                        "full",
+                        "--table_conf",
+                        CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER.key() + 
"=3");
+        StreamExecutionEnvironment env = 
streamExecutionEnvironmentBuilder().batchMode().build();
+        action.withStreamExecutionEnvironment(env).build();
+        env.execute();
+
+        checkLatestSnapshot(table, 3, Snapshot.CommitKind.COMPACT);
+
+        List<DataSplit> splits = table.newSnapshotReader().read().dataSplits();
+        assertThat(splits.size()).isEqualTo(2);
+        for (DataSplit split : splits) {
+            assertThat(split.dataFiles().size()).isEqualTo(1);
+        }
+    }
+
+    @Test
+    @Timeout(60)
+    public void testStreamingFullCompactStrategy() throws Exception {
+        prepareTable(
+                Arrays.asList("dt", "hh"),
+                Arrays.asList("dt", "hh", "k"),
+                Collections.emptyList(),
+                Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), 
"true"));
+        CompactAction action =
+                createAction(
+                        CompactAction.class,
+                        "compact",
+                        "--warehouse",
+                        warehouse,
+                        "--database",
+                        database,
+                        "--table",
+                        tableName,
+                        "--compact_strategy",
+                        "full",
+                        "--table_conf",
+                        CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER.key() + 
"=3");
+        StreamExecutionEnvironment env =
+                streamExecutionEnvironmentBuilder().streamingMode().build();
+        Assertions.assertThatThrownBy(() -> 
action.withStreamExecutionEnvironment(env).build())
+                .hasMessage(
+                        "The full compact strategy is only supported in batch 
mode. Please add -Dexecution.runtime-mode=BATCH.");
+    }
+
+    @Test
+    @Timeout(60)
+    public void testCompactStrategyWithWrongUsage() throws Exception {
+        prepareTable(
+                Arrays.asList("dt", "hh"),
+                Arrays.asList("dt", "hh", "k"),
+                Collections.emptyList(),
+                Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), 
"true"));
+        Assertions.assertThatThrownBy(
+                        () ->
+                                createAction(
+                                        CompactAction.class,
+                                        "compact",
+                                        "--warehouse",
+                                        warehouse,
+                                        "--database",
+                                        database,
+                                        "--table",
+                                        tableName,
+                                        "--compact_strategy",
+                                        "wrong_usage",
+                                        "--table_conf",
+                                        
CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER.key()
+                                                + "=3"))
+                .hasMessage(
+                        "The compact strategy only supports 'full' or 'minor', 
but 'wrong_usage' is configured.");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java
index bec669acd3..d79d13f026 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java
@@ -31,6 +31,7 @@ import org.apache.paimon.utils.StringUtils;
 import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
+import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.util.List;
@@ -240,6 +241,117 @@ public class CompactProcedureITCase extends 
CatalogITCaseBase {
         checkLatestSnapshot(table, 21, Snapshot.CommitKind.OVERWRITE);
     }
 
+    // ----------------------- Minor Compact -----------------------
+
+    @Test
+    public void testBatchMinorCompactStrategy() throws Exception {
+        sql(
+                "CREATE TABLE T ("
+                        + " k INT,"
+                        + " v INT,"
+                        + " hh INT,"
+                        + " dt STRING,"
+                        + " PRIMARY KEY (k, dt, hh) NOT ENFORCED"
+                        + ") PARTITIONED BY (dt, hh) WITH ("
+                        + " 'write-only' = 'true',"
+                        + " 'bucket' = '1'"
+                        + ")");
+        FileStoreTable table = paimonTable("T");
+        tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
+
+        sql("INSERT INTO T VALUES (1, 100, 15, '20221208'), (1, 100, 16, 
'20221208')");
+        sql("INSERT INTO T VALUES (2, 100, 15, '20221208'), (2, 100, 16, 
'20221208')");
+
+        checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);
+
+        sql(
+                "CALL sys.compact(`table` => 'default.T', compact_strategy => 
'minor', "
+                        + "options => 'num-sorted-run.compaction-trigger=3')");
+
+        // Due to the limitation of parameter 
'num-sorted-run.compaction-trigger' = 3, so compact is
+        // not
+        // performed.
+        checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);
+
+        // Make par-15 has 3 datafile and par-16 has 2 datafile, so par-16 
will not be picked out to
+        // compact.
+        sql("INSERT INTO T VALUES (1, 100, 15, '20221208')");
+
+        sql(
+                "CALL sys.compact(`table` => 'default.T', compact_strategy => 
'minor', "
+                        + "options => 'num-sorted-run.compaction-trigger=3')");
+
+        checkLatestSnapshot(table, 4, Snapshot.CommitKind.COMPACT);
+
+        List<DataSplit> splits = table.newSnapshotReader().read().dataSplits();
+        assertThat(splits.size()).isEqualTo(2);
+        for (DataSplit split : splits) {
+            // Par-16 is not compacted.
+            assertThat(split.dataFiles().size())
+                    .isEqualTo(split.partition().getInt(1) == 16 ? 2 : 1);
+        }
+    }
+
+    @Test
+    public void testBatchFullCompactStrategy() throws Exception {
+        sql(
+                "CREATE TABLE T ("
+                        + " k INT,"
+                        + " v INT,"
+                        + " hh INT,"
+                        + " dt STRING,"
+                        + " PRIMARY KEY (k, dt, hh) NOT ENFORCED"
+                        + ") PARTITIONED BY (dt, hh) WITH ("
+                        + " 'write-only' = 'true',"
+                        + " 'bucket' = '1'"
+                        + ")");
+        FileStoreTable table = paimonTable("T");
+        tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
+
+        sql("INSERT INTO T VALUES (1, 100, 15, '20221208'), (1, 100, 16, 
'20221208')");
+        sql("INSERT INTO T VALUES (2, 100, 15, '20221208'), (2, 100, 16, 
'20221208')");
+
+        checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);
+
+        sql(
+                "CALL sys.compact(`table` => 'default.T', compact_strategy => 
'full', "
+                        + "options => 'num-sorted-run.compaction-trigger=3')");
+
+        checkLatestSnapshot(table, 3, Snapshot.CommitKind.COMPACT);
+
+        List<DataSplit> splits = table.newSnapshotReader().read().dataSplits();
+        assertThat(splits.size()).isEqualTo(2);
+        for (DataSplit split : splits) {
+            // Par-16 is not compacted.
+            assertThat(split.dataFiles().size()).isEqualTo(1);
+        }
+    }
+
+    @Test
+    public void testStreamFullCompactStrategy() throws Exception {
+        sql(
+                "CREATE TABLE T ("
+                        + " k INT,"
+                        + " v INT,"
+                        + " hh INT,"
+                        + " dt STRING,"
+                        + " PRIMARY KEY (k, dt, hh) NOT ENFORCED"
+                        + ") PARTITIONED BY (dt, hh) WITH ("
+                        + " 'write-only' = 'true',"
+                        + " 'bucket' = '1'"
+                        + ")");
+        tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
+
+        Assertions.assertThatThrownBy(
+                        () ->
+                                streamSqlIter(
+                                                "CALL sys.compact(`table` => 
'default.T', compact_strategy => 'full', "
+                                                        + "options => 
'num-sorted-run.compaction-trigger=3')")
+                                        .close())
+                .hasMessageContaining(
+                        "The full compact strategy is only supported in batch 
mode. Please add -Dexecution.runtime-mode=BATCH.");
+    }
+
     private void checkLatestSnapshot(
             FileStoreTable table, long snapshotId, Snapshot.CommitKind 
commitKind) {
         SnapshotManager snapshotManager = table.snapshotManager();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
index c38ac4b3d6..42293ca284 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
@@ -132,7 +132,7 @@ public class CompactorSinkITCase extends AbstractTestBase {
                         .withContinuousMode(false)
                         .withPartitionPredicate(predicate)
                         .build();
-        new 
CompactorSinkBuilder(table).withFullCompaction(true).withInput(source).build();
+        new CompactorSinkBuilder(table, true).withInput(source).build();
         env.execute();
 
         snapshot = 
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
@@ -181,8 +181,8 @@ public class CompactorSinkITCase extends AbstractTestBase {
                                                 
FlinkConnectorOptions.SINK_PARALLELISM.key(),
                                                 
String.valueOf(sinkParalellism));
                                     }
-                                }))
-                .withFullCompaction(false)
+                                }),
+                        false)
                 .withInput(source)
                 .build();
 
@@ -275,7 +275,13 @@ public class CompactorSinkITCase extends AbstractTestBase {
     protected MultiTablesStoreCompactOperator createMultiTablesCompactOperator(
             Catalog.Loader catalogLoader) throws Exception {
         return new MultiTablesStoreCompactOperator(
-                catalogLoader, commitUser, new CheckpointConfig(), false, 
false, new Options());
+                catalogLoader,
+                commitUser,
+                new CheckpointConfig(),
+                false,
+                false,
+                true,
+                new Options());
     }
 
     private static byte[] partition(String dt, int hh) {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 71cf04cf5e..4a43e39c31 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -107,6 +107,7 @@ public class CompactProcedure extends BaseProcedure {
             new ProcedureParameter[] {
                 ProcedureParameter.required("table", StringType),
                 ProcedureParameter.optional("partitions", StringType),
+                ProcedureParameter.optional("compact_strategy", StringType),
                 ProcedureParameter.optional("order_strategy", StringType),
                 ProcedureParameter.optional("order_by", StringType),
                 ProcedureParameter.optional("where", StringType),
@@ -120,6 +121,9 @@ public class CompactProcedure extends BaseProcedure {
                         new StructField("result", DataTypes.BooleanType, true, 
Metadata.empty())
                     });
 
+    private static final String MINOR = "minor";
+    private static final String FULL = "full";
+
     protected CompactProcedure(TableCatalog tableCatalog) {
         super(tableCatalog);
     }
@@ -138,15 +142,17 @@ public class CompactProcedure extends BaseProcedure {
     public InternalRow[] call(InternalRow args) {
         Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
         String partitions = blank(args, 1) ? null : args.getString(1);
-        String sortType = blank(args, 2) ? TableSorter.OrderType.NONE.name() : 
args.getString(2);
+        // make full compact strategy as default.
+        String compactStrategy = blank(args, 2) ? FULL : args.getString(2);
+        String sortType = blank(args, 3) ? TableSorter.OrderType.NONE.name() : 
args.getString(3);
         List<String> sortColumns =
-                blank(args, 3)
+                blank(args, 4)
                         ? Collections.emptyList()
-                        : Arrays.asList(args.getString(3).split(","));
-        String where = blank(args, 4) ? null : args.getString(4);
-        String options = args.isNullAt(5) ? null : args.getString(5);
+                        : Arrays.asList(args.getString(4).split(","));
+        String where = blank(args, 5) ? null : args.getString(5);
+        String options = args.isNullAt(6) ? null : args.getString(6);
         Duration partitionIdleTime =
-                blank(args, 6) ? null : 
TimeUtils.parseDuration(args.getString(6));
+                blank(args, 7) ? null : 
TimeUtils.parseDuration(args.getString(7));
         if (TableSorter.OrderType.NONE.name().equals(sortType) && 
!sortColumns.isEmpty()) {
             throw new IllegalArgumentException(
                     "order_strategy \"none\" cannot work with order_by 
columns.");
@@ -155,6 +161,14 @@ public class CompactProcedure extends BaseProcedure {
             throw new IllegalArgumentException(
                     "sort compact do not support 'partition_idle_time'.");
         }
+
+        if (!(compactStrategy.equalsIgnoreCase(FULL) || 
compactStrategy.equalsIgnoreCase(MINOR))) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "The compact strategy only supports 'full' or 
'minor', but '%s' is configured.",
+                            compactStrategy));
+        }
+
         checkArgument(
                 partitions == null || where == null,
                 "partitions and where cannot be used together.");
@@ -192,6 +206,7 @@ public class CompactProcedure extends BaseProcedure {
                             newInternalRow(
                                     execute(
                                             (FileStoreTable) table,
+                                            compactStrategy,
                                             sortType,
                                             sortColumns,
                                             relation,
@@ -212,6 +227,7 @@ public class CompactProcedure extends BaseProcedure {
 
     private boolean execute(
             FileStoreTable table,
+            String compactStrategy,
             String sortType,
             List<String> sortColumns,
             DataSourceV2Relation relation,
@@ -219,6 +235,7 @@ public class CompactProcedure extends BaseProcedure {
             @Nullable Duration partitionIdleTime) {
         BucketMode bucketMode = table.bucketMode();
         TableSorter.OrderType orderType = TableSorter.OrderType.of(sortType);
+        boolean fullCompact = compactStrategy.equalsIgnoreCase(FULL);
         Predicate filter =
                 condition == null
                         ? null
@@ -233,7 +250,8 @@ public class CompactProcedure extends BaseProcedure {
             switch (bucketMode) {
                 case HASH_FIXED:
                 case HASH_DYNAMIC:
-                    compactAwareBucketTable(table, filter, partitionIdleTime, 
javaSparkContext);
+                    compactAwareBucketTable(
+                            table, fullCompact, filter, partitionIdleTime, 
javaSparkContext);
                     break;
                 case BUCKET_UNAWARE:
                     compactUnAwareBucketTable(table, filter, 
partitionIdleTime, javaSparkContext);
@@ -259,6 +277,7 @@ public class CompactProcedure extends BaseProcedure {
 
     private void compactAwareBucketTable(
             FileStoreTable table,
+            boolean fullCompact,
             @Nullable Predicate filter,
             @Nullable Duration partitionIdleTime,
             JavaSparkContext javaSparkContext) {
@@ -304,7 +323,7 @@ public class CompactProcedure extends BaseProcedure {
                                                             
SerializationUtils.deserializeBinaryRow(
                                                                     
pair.getLeft()),
                                                             pair.getRight(),
-                                                            true);
+                                                            fullCompact);
                                                 }
                                                 CommitMessageSerializer 
serializer =
                                                         new 
CommitMessageSerializer();
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
index 130860c835..31f78f61c2 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
@@ -39,6 +39,56 @@ abstract class CompactProcedureTestBase extends 
PaimonSparkTestBase with StreamT
 
   import testImplicits._
 
+  // ----------------------- Minor Compact -----------------------
+
+  test("Paimon Procedure: compact aware bucket pk table with minor compact 
strategy") {
+    withTable("T") {
+      spark.sql(s"""
+                   |CREATE TABLE T (id INT, value STRING, pt STRING)
+                   |TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='1', 
'write-only'='true')
+                   |PARTITIONED BY (pt)
+                   |""".stripMargin)
+
+      val table = loadTable("T")
+
+      spark.sql(s"INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p2')")
+      spark.sql(s"INSERT INTO T VALUES (3, 'c', 'p1'), (4, 'd', 'p2')")
+
+      
Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.APPEND)).isTrue
+      Assertions.assertThat(lastSnapshotId(table)).isEqualTo(2)
+
+      spark.sql(
+        "CALL sys.compact(table => 'T', compact_strategy => 'minor'," +
+          "options => 'num-sorted-run.compaction-trigger=3')")
+
+      // Due to the limitation of parameter 
'num-sorted-run.compaction-trigger' = 3, so compact is not
+      // performed.
+      
Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.APPEND)).isTrue
+      Assertions.assertThat(lastSnapshotId(table)).isEqualTo(2)
+
+      // Make par-p1 has 3 datafile and par-p2 has 2 datafile, so par-p2 will 
not be picked out to
+      // compact.
+      spark.sql(s"INSERT INTO T VALUES (1, 'a', 'p1')")
+
+      spark.sql(
+        "CALL sys.compact(table => 'T', compact_strategy => 'minor'," +
+          "options => 'num-sorted-run.compaction-trigger=3')")
+
+      Assertions.assertThat(lastSnapshotId(table)).isEqualTo(4)
+      
Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue
+
+      val splits = table.newSnapshotReader.read.dataSplits
+      splits.forEach(
+        split => {
+          Assertions
+            .assertThat(split.dataFiles.size)
+            .isEqualTo(if (split.partition().getString(0).toString == "p2") 2 
else 1)
+        })
+    }
+  }
+
+  // ----------------------- Sort Compact -----------------------
+
   test("Paimon Procedure: sort compact") {
     failAfter(streamingTimeout) {
       withTempDir {

Reply via email to