This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f3be98671 [flink][spark] support history partition compaction in 
dedicated compaction job. (#3953)
f3be98671 is described below

commit f3be98671de69639c9de2381eeae84aa4de2db52
Author: LsomeYeah <[email protected]>
AuthorDate: Sun Aug 18 22:22:47 2024 +0800

    [flink][spark] support history partition compaction in dedicated compaction 
job. (#3953)
---
 docs/content/flink/procedures.md                   |   3 +
 docs/content/maintenance/dedicated-compaction.md   | 115 +++++++++++++++++-
 docs/content/spark/procedures.md                   |   4 +-
 .../paimon/flink/procedure/CompactProcedure.java   |  29 +++++
 .../ProcedurePositionalArgumentsITCase.java        |   3 +
 .../apache/paimon/flink/action/CompactAction.java  |  17 ++-
 .../paimon/flink/action/CompactActionFactory.java  |  21 +++-
 .../paimon/flink/action/CompactDatabaseAction.java |  26 ++++-
 .../flink/action/CompactDatabaseActionFactory.java |  28 ++++-
 .../UnawareBucketCompactionTopoBuilder.java        |  44 +++++++
 .../flink/procedure/CompactDatabaseProcedure.java  |  23 ++++
 .../CombinedTableCompactorSourceBuilder.java       |  16 ++-
 .../flink/source/CompactorSourceBuilder.java       |  45 +++++++
 .../operator/CombinedAwareBatchSourceFunction.java |   9 +-
 .../CombinedUnawareBatchSourceFunction.java        |  54 ++++++++-
 .../source/operator/MultiTablesReadOperator.java   |  59 +++++++++-
 .../operator/MultiUnawareTablesReadOperator.java   | 129 +++++++++++++++++++++
 .../flink/utils/MultiTablesCompactorUtil.java      |  14 +++
 .../paimon/flink/action/CompactActionITCase.java   | 108 +++++++++++++++++
 .../flink/action/CompactDatabaseActionITCase.java  | 101 ++++++++++++++++
 .../paimon/flink/source/CompactorSourceITCase.java |  49 ++++++++
 .../MultiTablesCompactorSourceBuilderITCase.java   | 104 +++++++++++++++++
 .../paimon/spark/procedure/CompactProcedure.java   |  80 +++++++++++--
 .../spark/procedure/CompactProcedureTestBase.scala |  87 ++++++++++++++
 24 files changed, 1134 insertions(+), 34 deletions(-)

diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 6209791b8..f028659a5 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -68,6 +68,7 @@ All available procedures are listed below.
             <li>order_by(optional): the columns need to be sort. Left empty if 
'order_strategy' is 'none'.</li>
             <li>options(optional): additional dynamic options of the 
table.</li>
             <li>where(optional): partition predicate(Can't be used together 
with "partitions"). Note: as where is a keyword,a pair of backticks need to add 
around like `where`.</li>
+            <li>partition_idle_time(optional): this is used to do a full 
compaction for partition which had not received any new data for 
'partition_idle_time'. And only these partitions will be compacted. This 
argument can not be used with order compact.</li>
       </td>
       <td>
          -- use partition filter <br/>
@@ -85,6 +86,7 @@ All available procedures are listed below.
          CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 
'includingTables') <br/><br/> 
          CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 
'includingTables', 'excludingTables') <br/><br/>
          CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 
'includingTables', 'excludingTables', 'tableOptions')
+         CALL [catalog.]sys.compact_database('includingDatabases', 'mode', 
'includingTables', 'excludingTables', 'tableOptions', 'partitionIdleTime')
       </td>
       <td>
          To compact databases. Arguments:
@@ -95,6 +97,7 @@ All available procedures are listed below.
             <li>includingTables: to specify tables. You can use regular 
expression.</li>
             <li>excludingTables: to specify tables that are not compacted. You 
can use regular expression.</li>
             <li>tableOptions: additional dynamic options of the table.</li>
+            <li>partition_idle_time: this is used to do a full compaction for 
partition which had not received any new data for 'partition_idle_time'. And 
only these partitions will be compacted.</li>
       </td>
       <td>
          CALL sys.compact_database('db1|db2', 'combined', 'table_.*', 
'ignore', 'sink.parallelism=4')
diff --git a/docs/content/maintenance/dedicated-compaction.md 
b/docs/content/maintenance/dedicated-compaction.md
index 981823540..f7eb03e1d 100644
--- a/docs/content/maintenance/dedicated-compaction.md
+++ b/docs/content/maintenance/dedicated-compaction.md
@@ -128,7 +128,7 @@ For more usage of the compact action, see
 
 {{< /tab >}}
 
-{{< tab "Flink" >}}
+{{< tab "Flink SQL" >}}
 
 Run the following sql:
 
@@ -178,7 +178,7 @@ You can run the following command to submit a compaction 
job for multiple databa
 * `--including_databases` is used to specify which database is to be 
compacted. In compact mode, you need to specify a database name, in 
compact_database mode, you could specify multiple database, regular expression 
is supported.
 * `--including_tables` is used to specify which source tables are to be 
compacted, you must use '|' to separate multiple tables, the format is 
`databaseName.tableName`, regular expression is supported. For example, 
specifying "--including_tables db1.t1|db2.+" means to compact table 'db1.t1' 
and all tables in the db2 database.
 * `--excluding_tables`  is used to specify which source tables are not to be 
compacted. The usage is same as "--including_tables". "--excluding_tables" has 
higher priority than "--including_tables" if you specified both.
-* `mode` is used to specify compaction mode. Possible values:
+* `--mode` is used to specify compaction mode. Possible values:
   * "divided" (the default mode if you haven't specified one): start a sink 
for each table, the compaction of the new table requires restarting the job.
   * "combined": start a single combined sink for all tables, the new table 
will be automatically compacted.
 * `--catalog_conf` is the configuration for Paimon catalog. Each configuration 
should be specified in the format `key=value`. See [here]({{< ref 
"maintenance/configurations" >}}) for a complete list of catalog configurations.
@@ -243,7 +243,7 @@ For more usage of the compact_database action, see
 
 {{< /tab >}}
 
-{{< tab "Flink" >}}
+{{< tab "Flink SQL" >}}
 
 Run the following sql:
 
@@ -284,7 +284,7 @@ you can trigger a compact with specified column sort to 
speed up queries.
     --database <database-name> \ 
     --table <table-name> \
     --order_strategy <orderType> \
-    --order_by <col1,col2,...>
+    --order_by <col1,col2,...> \
     [--partition <partition-name>] \
     [--catalog_conf <paimon-catalog-conf> [--catalog_conf 
<paimon-catalog-conf> ...]] \
     [--table_conf <paimon-table-dynamic-conf> [--table_conf 
<paimon-table-dynamic-conf>] ...]
@@ -296,7 +296,7 @@ The sort parallelism is the same as the sink parallelism, 
you can dynamically sp
 
 {{< /tab >}}
 
-{{< tab "Flink" >}}
+{{< tab "Flink SQL" >}}
 
 Run the following sql:
 
@@ -308,3 +308,108 @@ CALL sys.compact(`table` => 'default.T', order_strategy 
=> 'zorder', order_by =>
 
 {{< /tabs >}}
 
+## Historical Partition Compact
+
+You can run the following command to submit a compaction job for partition 
which has not received any new data for a period of time. Small files in those 
partitions will be full compacted.
+
+{{< hint info >}}
+
+This feature now is only used in batch mode.
+
+{{< /hint >}}
+
+### For Table
+
+This is for one table.
+{{< tabs "history-partition-compaction-job for table" >}}
+
+{{< tab "Flink Action Jar" >}}
+
+```bash  
+<FLINK_HOME>/bin/flink run \
+    -D execution.runtime-mode=batch \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
+    compact \
+    --warehouse <warehouse-path> \
+    --database <database-name> \ 
+    --table <table-name> \
+    --partition_idle_time <partition-idle-time> \ 
+    [--partition <partition-name>] \
+    [--catalog_conf <paimon-catalog-conf> [--catalog_conf 
<paimon-catalog-conf> ...]] \
+    [--table_conf <paimon-table-dynamic-conf> [--table_conf 
<paimon-table-dynamic-conf>] ...]
+```
+There are one new configuration in `Historical Partition Compact`
+
+* `--partition_idle_time`: this is used to do a full compaction for partition 
which had not received any new data for 'partition_idle_time'. And only these 
partitions will be compacted.
+
+{{< /tab >}}
+
+{{< tab "Flink SQL" >}}
+
+Run the following sql:
+
+```sql
+-- history partition compact table
+CALL sys.compact(`table` => 'default.T', 'partition_idle_time' => '5s')
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+### For Databases
+
+This is for multiple tables in different databases.
+{{< tabs "history-partition-compaction-job for databases" >}}
+
+{{< tab "Flink Action Jar" >}}
+
+```bash  
+<FLINK_HOME>/bin/flink run \
+    -D execution.runtime-mode=batch \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
+    compact_database \
+    --warehouse <warehouse-path> \
+    --including_databases <database-name|name-regular-expr> \
+    --partition_idle_time <partition-idle-time> \ 
+    [--including_tables <paimon-table-name|name-regular-expr>] \
+    [--excluding_tables <paimon-table-name|name-regular-expr>] \
+    [--mode <compact-mode>] \
+    [--catalog_conf <paimon-catalog-conf> [--catalog_conf 
<paimon-catalog-conf> ...]] \
+    [--table_conf <paimon-table_conf> [--table_conf <paimon-table_conf> ...]]
+```
+
+Example: compact historical partitions for tables in database
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
+    compact_database \
+    --warehouse s3:///path/to/warehouse \
+    --including_databases test_db \
+    --partition_idle_time 60s \
+    --catalog_conf s3.endpoint=https://****.com \
+    --catalog_conf s3.access-key=***** \
+    --catalog_conf s3.secret-key=*****
+```
+
+{{< /tab >}}
+
+{{< tab "Flink SQL" >}}
+
+Run the following sql:
+
+```sql
+-- history partition compact table
+CALL sys.compact_database('includingDatabases', 'mode', 'includingTables', 
'excludingTables', 'tableOptions', 'partition_idle_time')
+```
+
+Example: compact historical partitions for tables in database
+
+```sql
+-- history partition compact table
+CALL sys.compact_database('test_db', 'combined', '', '', '', '60s')
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
\ No newline at end of file
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index 9a53a79ee..d77163375 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -47,11 +47,13 @@ This section introduce all available spark procedures about 
paimon.
             <li>order_strategy: 'order' or 'zorder' or 'hilbert' or 'none'. 
Left empty for 'none'.</li>
             <li>order_columns: the columns need to be sort. Left empty if 
'order_strategy' is 'none'.</li>
             <li>max_concurrent_jobs: when sort compact is used, files in one 
partition are grouped and submitted as a single spark compact job. This 
parameter controls the maximum number of jobs that can be submitted 
simultaneously. The default value is 15.</li>
+            <li>partition_idle_time: this is used to do a full compaction for 
partition which had not received any new data for 'partition_idle_time'. And 
only these partitions will be compacted. This argument can not be used with 
order compact.</li>
       </td>
       <td>
          SET spark.sql.shuffle.partitions=10; --set the compact parallelism 
<br/><br/>
          CALL sys.compact(table => 'T', partitions => 'p=0;p=1',  
order_strategy => 'zorder', order_by => 'a,b') <br/><br/>
-         CALL sys.compact(table => 'T', where => 'p>0 and p<3', order_strategy 
=> 'zorder', order_by => 'a,b')
+         CALL sys.compact(table => 'T', where => 'p>0 and p<3', order_strategy 
=> 'zorder', order_by => 'a,b') <br/><br/>
+         CALL sys.compact(table => 'T', partition_idle_time => '60s')
       </td>
     </tr>
     <tr>
diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
index 2fcbc98ba..e6cbb299c 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
@@ -22,7 +22,9 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.CompactAction;
 import org.apache.paimon.flink.action.SortCompactAction;
 import org.apache.paimon.utils.ParameterUtils;
+import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.StringUtils;
+import org.apache.paimon.utils.TimeUtils;
 
 import org.apache.flink.table.procedure.ProcedureContext;
 
@@ -96,6 +98,27 @@ public class CompactProcedure extends ProcedureBase {
             String tableOptions,
             String whereSql)
             throws Exception {
+        return call(
+                procedureContext,
+                tableId,
+                partitions,
+                orderStrategy,
+                orderByColumns,
+                tableOptions,
+                whereSql,
+                "");
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext,
+            String tableId,
+            String partitions,
+            String orderStrategy,
+            String orderByColumns,
+            String tableOptions,
+            String whereSql,
+            String partitionIdleTime)
+            throws Exception {
 
         String warehouse = catalog.warehouse();
         Map<String, String> catalogOptions = catalog.options();
@@ -114,8 +137,14 @@ public class CompactProcedure extends ProcedureBase {
                             identifier.getObjectName(),
                             catalogOptions,
                             tableConf);
+            if (!(StringUtils.isBlank(partitionIdleTime))) {
+                
action.withPartitionIdleTime(TimeUtils.parseDuration(partitionIdleTime));
+            }
             jobName = "Compact Job";
         } else if (!orderStrategy.isEmpty() && !orderByColumns.isEmpty()) {
+            Preconditions.checkArgument(
+                    StringUtils.isBlank(partitionIdleTime),
+                    "sort compact do not support 'partition_idle_time'.");
             action =
                     new SortCompactAction(
                                     warehouse,
diff --git 
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
 
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
index 1db04aa1d..cd9a8b184 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
@@ -58,6 +58,9 @@ public class ProcedurePositionalArgumentsITCase extends 
CatalogITCaseBase {
                                 sql(
                                         "CALL sys.compact('default.T', '', '', 
'', 'sink.parallelism=1','pt=1')"))
                 .doesNotThrowAnyException();
+        assertThatCode(() -> sql("CALL sys.compact('default.T', '', 'zorder', 
'k', '','','5s')"))
+                .message()
+                .contains("sort compact do not support 
'partition_idle_time'.");
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 59ca818c3..3cb3513dd 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -38,6 +38,9 @@ import org.apache.flink.table.data.RowData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
+import java.time.Duration;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -54,6 +57,8 @@ public class CompactAction extends TableActionBase {
 
     private String whereSql;
 
+    @Nullable private Duration partitionIdleTime = null;
+
     public CompactAction(String warehouse, String database, String tableName) {
         this(warehouse, database, tableName, Collections.emptyMap(), 
Collections.emptyMap());
     }
@@ -90,6 +95,11 @@ public class CompactAction extends TableActionBase {
         return this;
     }
 
+    public CompactAction withPartitionIdleTime(@Nullable Duration 
partitionIdleTime) {
+        this.partitionIdleTime = partitionIdleTime;
+        return this;
+    }
+
     @Override
     public void build() throws Exception {
         ReadableConfig conf = env.getConfiguration();
@@ -120,7 +130,11 @@ public class CompactAction extends TableActionBase {
 
         sourceBuilder.withPartitionPredicate(getPredicate());
         DataStreamSource<RowData> source =
-                
sourceBuilder.withEnv(env).withContinuousMode(isStreaming).build();
+                sourceBuilder
+                        .withEnv(env)
+                        .withContinuousMode(isStreaming)
+                        .withPartitionIdleTime(partitionIdleTime)
+                        .build();
         sinkBuilder.withInput(source).build();
     }
 
@@ -132,6 +146,7 @@ public class CompactAction extends TableActionBase {
 
         
unawareBucketCompactionTopoBuilder.withPartitionPredicate(getPredicate());
         unawareBucketCompactionTopoBuilder.withContinuousMode(isStreaming);
+        
unawareBucketCompactionTopoBuilder.withPartitionIdleTime(partitionIdleTime);
         unawareBucketCompactionTopoBuilder.build();
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
index c65de5c7c..f43c7a747 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
@@ -18,6 +18,9 @@
 
 package org.apache.paimon.flink.action;
 
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.TimeUtils;
+
 import org.apache.flink.api.java.tuple.Tuple3;
 
 import java.util.List;
@@ -34,6 +37,8 @@ public class CompactActionFactory implements ActionFactory {
 
     private static final String WHERE = "where";
 
+    private static final String PARTITION_IDLE_TIME = "partition_idle_time";
+
     @Override
     public String identifier() {
         return IDENTIFIER;
@@ -47,6 +52,9 @@ public class CompactActionFactory implements ActionFactory {
 
         CompactAction action;
         if (params.has(ORDER_STRATEGY)) {
+            Preconditions.checkArgument(
+                    !params.has(PARTITION_IDLE_TIME),
+                    "sort compact do not support 'partition_idle_time'.");
             action =
                     new SortCompactAction(
                                     tablePath.f0,
@@ -64,6 +72,10 @@ public class CompactActionFactory implements ActionFactory {
                             tablePath.f2,
                             catalogConfig,
                             optionalConfigMap(params, TABLE_CONF));
+            if (params.has(PARTITION_IDLE_TIME)) {
+                action.withPartitionIdleTime(
+                        
TimeUtils.parseDuration(params.get(PARTITION_IDLE_TIME)));
+            }
         }
 
         if (params.has(PARTITION)) {
@@ -88,7 +100,8 @@ public class CompactActionFactory implements ActionFactory {
                         + "--table <table_name> [--partition <partition_name>]"
                         + "[--order_strategy <order_strategy>]"
                         + "[--table_conf <key>=<value>]"
-                        + "[--order_by <order_columns>]");
+                        + "[--order_by <order_columns>]"
+                        + "[--partition_idle_time <partition_idle_time>]");
         System.out.println(
                 "  compact --warehouse s3://path/to/warehouse --database 
<database_name> "
                         + "--table <table_name> [--catalog_conf 
<paimon_catalog_conf> [--catalog_conf <paimon_catalog_conf> ...]]");
@@ -103,6 +116,9 @@ public class CompactActionFactory implements ActionFactory {
         System.out.println(
                 "  order compact now only support append-only table with 
bucket=-1, please don't specify --order_strategy parameter if your table does 
not meet the request");
         System.out.println("  order_strategy now only support zorder in batch 
mode");
+        System.out.println(
+                "partition_idle_time now can not be used with order_strategy, 
please don't specify --partition_idle_time when use order compact");
+        System.out.println("  partition_idle_time now is only supported in 
batch mode");
         System.out.println();
 
         System.out.println("Examples:");
@@ -113,6 +129,9 @@ public class CompactActionFactory implements ActionFactory {
         System.out.println(
                 "  compact --warehouse hdfs:///path/to/warehouse --database 
test_db --table test_table "
                         + "--partition dt=20221126,hh=08 --partition 
dt=20221127,hh=09");
+        System.out.println(
+                "  compact --warehouse hdfs:///path/to/warehouse --database 
test_db --table test_table "
+                        + "--partition_idle_time 10s");
         System.out.println(
                 "  compact --warehouse s3:///path/to/warehouse "
                         + "--database test_db "
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
index 5de14768a..7c5854e2d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
@@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -68,6 +69,8 @@ public class CompactDatabaseAction extends ActionBase {
 
     private Options tableOptions = new Options();
 
+    @Nullable private Duration partitionIdleTime = null;
+
     public CompactDatabaseAction(String warehouse, Map<String, String> 
catalogConfig) {
         super(warehouse, catalogConfig);
     }
@@ -101,6 +104,11 @@ public class CompactDatabaseAction extends ActionBase {
         return this;
     }
 
+    public CompactDatabaseAction withPartitionIdleTime(@Nullable Duration 
partitionIdleTime) {
+        this.partitionIdleTime = partitionIdleTime;
+        return this;
+    }
+
     private boolean shouldCompactionTable(String paimonFullTableName) {
         boolean shouldCompaction = 
includingPattern.matcher(paimonFullTableName).matches();
         if (excludingPattern != null) {
@@ -191,11 +199,14 @@ public class CompactDatabaseAction extends ActionBase {
                 conf.get(ExecutionOptions.RUNTIME_MODE) == 
RuntimeExecutionMode.STREAMING;
         CombinedTableCompactorSourceBuilder sourceBuilder =
                 new CombinedTableCompactorSourceBuilder(
-                        catalogLoader(),
-                        databasePattern,
-                        includingPattern,
-                        excludingPattern,
-                        
tableOptions.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis());
+                                catalogLoader(),
+                                databasePattern,
+                                includingPattern,
+                                excludingPattern,
+                                tableOptions
+                                        
.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL)
+                                        .toMillis())
+                        .withPartitionIdleTime(partitionIdleTime);
 
         // multi bucket table which has multi bucket in a partition like fix 
bucket and dynamic
         // bucket
@@ -225,7 +236,9 @@ public class CompactDatabaseAction extends ActionBase {
             FileStoreTable table,
             boolean isStreaming) {
 
-        CompactorSourceBuilder sourceBuilder = new 
CompactorSourceBuilder(fullName, table);
+        CompactorSourceBuilder sourceBuilder =
+                new CompactorSourceBuilder(fullName, table)
+                        .withPartitionIdleTime(partitionIdleTime);
         CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table);
 
         DataStreamSource<RowData> source =
@@ -242,6 +255,7 @@ public class CompactDatabaseAction extends ActionBase {
                 new UnawareBucketCompactionTopoBuilder(env, fullName, table);
 
         unawareBucketCompactionTopoBuilder.withContinuousMode(isStreaming);
+        
unawareBucketCompactionTopoBuilder.withPartitionIdleTime(partitionIdleTime);
         unawareBucketCompactionTopoBuilder.build();
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java
index a7b761044..b26870907 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.flink.action;
 
+import org.apache.paimon.utils.TimeUtils;
+
 import java.util.Optional;
 
 /** Factory to create {@link CompactDatabaseAction}. */
@@ -29,6 +31,7 @@ public class CompactDatabaseActionFactory implements 
ActionFactory {
     private static final String INCLUDING_TABLES = "including_tables";
     private static final String EXCLUDING_TABLES = "excluding_tables";
     private static final String MODE = "mode";
+    private static final String PARTITION_IDLE_TIME = "partition_idle_time";
 
     @Override
     public String identifier() {
@@ -47,6 +50,10 @@ public class CompactDatabaseActionFactory implements 
ActionFactory {
                 .excludingTables(params.get(EXCLUDING_TABLES))
                 .withDatabaseCompactMode(params.get(MODE))
                 .withTableOptions(optionalConfigMap(params, TABLE_CONF));
+        String partitionIdleTime = params.get(PARTITION_IDLE_TIME);
+        if (partitionIdleTime != null) {
+            
action.withPartitionIdleTime(TimeUtils.parseDuration(partitionIdleTime));
+        }
 
         return Optional.of(action);
     }
@@ -59,14 +66,20 @@ public class CompactDatabaseActionFactory implements 
ActionFactory {
 
         System.out.println("Syntax:");
         System.out.println(
-                "  compact_database --warehouse <warehouse_path> --database 
<database_name> "
+                "  compact_database --warehouse <warehouse_path> 
--including_databases <database-name|name-regular-expr> "
                         + "[--including_tables 
<paimon_table_name|name_regular_expr>] "
-                        + "[--excluding_tables 
<paimon_table_name|name_regular_expr>] ");
+                        + "[--excluding_tables 
<paimon_table_name|name_regular_expr>] "
+                        + "[--mode <compact_mode>]"
+                        + "[--partition_idle_time <partition_idle_time>]");
         System.out.println(
-                "  compact_database --warehouse s3://path/to/warehouse 
--database <database_name> "
+                "  compact_database --warehouse s3://path/to/warehouse 
--including_databases <database-name|name-regular-expr> "
                         + "[--catalog_conf <paimon_catalog_conf> 
[--catalog_conf <paimon_catalog_conf> ...]]");
         System.out.println();
 
+        System.out.println(
+                "--including_databases is used to specify which databases are 
to be compacted. "
+                        + "You must use '|' to separate multiple databases, 
Regular expression is supported.");
+
         System.out.println(
                 "--including_tables is used to specify which source tables are 
to be compacted. "
                         + "You must use '|' to separate multiple tables, the 
format is `databaseName.tableName`, Regular expression is supported.");
@@ -75,14 +88,19 @@ public class CompactDatabaseActionFactory implements 
ActionFactory {
                         + "The usage is same as --including_tables.");
         System.out.println(
                 "--excluding_tables has higher priority than 
--including_tables if you specified both.");
+        System.out.println(
+                "--mode is used to specify compaction mode. Possible values: 
divided, combined.");
+        System.out.println(
+                "--partition_idle_time is used to do a full compaction for 
partition which had not receive any new data for 'partition_idle_time' time. 
And only these partitions will be compacted.");
+        System.out.println("--partition_idle_time is only supported in batch 
mode. ");
         System.out.println();
 
         System.out.println("Examples:");
         System.out.println(
-                "  compact_database --warehouse hdfs:///path/to/warehouse 
--database test_db");
+                "  compact_database --warehouse hdfs:///path/to/warehouse 
--including_databases test_db");
         System.out.println(
                 "  compact_database --warehouse s3:///path/to/warehouse "
-                        + "--database test_db "
+                        + "--including_databases test_db "
                         + "--catalog_conf s3.endpoint=https://****.com "
                         + "--catalog_conf s3.access-key=***** "
                         + "--catalog_conf s3.secret-key=***** ");
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
index 3efb58a7f..8c6ed4c9f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
@@ -19,21 +19,32 @@
 package org.apache.paimon.flink.compact;
 
 import org.apache.paimon.append.UnawareAppendCompactionTask;
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.sink.UnawareBucketCompactionSink;
 import org.apache.paimon.flink.source.BucketUnawareCompactSource;
+import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.Preconditions;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 
 import javax.annotation.Nullable;
 
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
 /**
  * Build for unaware-bucket table flink compaction job.
  *
@@ -52,6 +63,7 @@ public class UnawareBucketCompactionTopoBuilder {
     private boolean isContinuous = false;
 
     @Nullable private Predicate partitionPredicate;
+    @Nullable private Duration partitionIdleTime = null;
 
     public UnawareBucketCompactionTopoBuilder(
             StreamExecutionEnvironment env, String tableIdentifier, 
FileStoreTable table) {
@@ -68,15 +80,47 @@ public class UnawareBucketCompactionTopoBuilder {
         this.partitionPredicate = predicate;
     }
 
+    public void withPartitionIdleTime(@Nullable Duration partitionIdleTime) {
+        this.partitionIdleTime = partitionIdleTime;
+    }
+
     public void build() {
         // build source from UnawareSourceFunction
         DataStreamSource<UnawareAppendCompactionTask> source = buildSource();
+        if (isContinuous) {
+            Preconditions.checkArgument(
+                    partitionIdleTime == null, "Streaming mode does not 
support partitionIdleTime");
+        } else if (partitionIdleTime != null) {
+            Map<BinaryRow, Long> partitionInfo = getPartitionInfo(table);
+            long historyMilli =
+                    LocalDateTime.now()
+                            .minus(partitionIdleTime)
+                            .atZone(ZoneId.systemDefault())
+                            .toInstant()
+                            .toEpochMilli();
+            SingleOutputStreamOperator<UnawareAppendCompactionTask> 
filterStream =
+                    source.filter(
+                            task -> {
+                                BinaryRow partition = task.partition();
+                                return partitionInfo.get(partition) <= 
historyMilli;
+                            });
+            source = new DataStreamSource<>(filterStream);
+        }
 
         // from source, construct the full flink job
         sinkFromSource(source);
     }
 
+    private Map<BinaryRow, Long> getPartitionInfo(FileStoreTable table) {
+        List<PartitionEntry> partitions = 
table.newSnapshotReader().partitionEntries();
+        return partitions.stream()
+                .collect(
+                        Collectors.toMap(
+                                PartitionEntry::partition, 
PartitionEntry::lastFileCreationTime));
+    }
+
     private DataStreamSource<UnawareAppendCompactionTask> buildSource() {
+
         long scanInterval = 
table.coreOptions().continuousDiscoveryInterval().toMillis();
         BucketUnawareCompactSource source =
                 new BucketUnawareCompactSource(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
index 4fea9b635..755118941 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.procedure;
 
 import org.apache.paimon.flink.action.CompactDatabaseAction;
 import org.apache.paimon.utils.StringUtils;
+import org.apache.paimon.utils.TimeUtils;
 
 import org.apache.flink.table.procedure.ProcedureContext;
 
@@ -98,6 +99,25 @@ public class CompactDatabaseProcedure extends ProcedureBase {
             String excludingTables,
             String tableOptions)
             throws Exception {
+        return call(
+                procedureContext,
+                includingDatabases,
+                mode,
+                includingTables,
+                excludingTables,
+                tableOptions,
+                "");
+    }
+
+    public String[] call(
+            ProcedureContext procedureContext,
+            String includingDatabases,
+            String mode,
+            String includingTables,
+            String excludingTables,
+            String tableOptions,
+            String partitionIdleTime)
+            throws Exception {
         String warehouse = catalog.warehouse();
         Map<String, String> catalogOptions = catalog.options();
         CompactDatabaseAction action =
@@ -109,6 +129,9 @@ public class CompactDatabaseProcedure extends ProcedureBase 
{
         if (!StringUtils.isBlank(tableOptions)) {
             
action.withTableOptions(parseCommaSeparatedKeyValues(tableOptions));
         }
+        if (!StringUtils.isBlank(partitionIdleTime)) {
+            
action.withPartitionIdleTime(TimeUtils.parseDuration(partitionIdleTime));
+        }
 
         return execute(procedureContext, action, "Compact database job");
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java
index 2159cbe9e..384cdcdce 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java
@@ -34,6 +34,9 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 
+import javax.annotation.Nullable;
+
+import java.time.Duration;
 import java.util.regex.Pattern;
 
 /**
@@ -49,6 +52,7 @@ public class CombinedTableCompactorSourceBuilder {
 
     private boolean isContinuous = false;
     private StreamExecutionEnvironment env;
+    @Nullable private Duration partitionIdleTime = null;
 
     public CombinedTableCompactorSourceBuilder(
             Catalog.Loader catalogLoader,
@@ -73,6 +77,12 @@ public class CombinedTableCompactorSourceBuilder {
         return this;
     }
 
+    public CombinedTableCompactorSourceBuilder withPartitionIdleTime(
+            @Nullable Duration partitionIdleTime) {
+        this.partitionIdleTime = partitionIdleTime;
+        return this;
+    }
+
     public DataStream<RowData> buildAwareBucketTableSource() {
         Preconditions.checkArgument(env != null, "StreamExecutionEnvironment 
should not be null.");
         RowType produceType = BucketsTable.getRowType();
@@ -94,7 +104,8 @@ public class CombinedTableCompactorSourceBuilder {
                     catalogLoader,
                     includingPattern,
                     excludingPattern,
-                    databasePattern);
+                    databasePattern,
+                    partitionIdleTime);
         }
     }
 
@@ -116,7 +127,8 @@ public class CombinedTableCompactorSourceBuilder {
                     catalogLoader,
                     includingPattern,
                     excludingPattern,
-                    databasePattern);
+                    databasePattern,
+                    partitionIdleTime);
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
index ee531a2e2..10fc3b558 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
@@ -19,26 +19,37 @@
 package org.apache.paimon.flink.source;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.LogicalTypeConversion;
+import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.system.BucketsTable;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Preconditions;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 
 import javax.annotation.Nullable;
 
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
 
 /**
  * Source builder to build a Flink {@link StaticFileStoreSource} or {@link
@@ -52,6 +63,7 @@ public class CompactorSourceBuilder {
     private boolean isContinuous = false;
     private StreamExecutionEnvironment env;
     @Nullable private Predicate partitionPredicate = null;
+    @Nullable private Duration partitionIdleTime = null;
 
     public CompactorSourceBuilder(String tableIdentifier, FileStoreTable 
table) {
         this.tableIdentifier = tableIdentifier;
@@ -68,6 +80,11 @@ public class CompactorSourceBuilder {
         return this;
     }
 
+    public CompactorSourceBuilder withPartitionIdleTime(@Nullable Duration 
partitionIdleTime) {
+        this.partitionIdleTime = partitionIdleTime;
+        return this;
+    }
+
     private Source<RowData, ?, ?> buildSource(BucketsTable bucketsTable) {
 
         if (isContinuous) {
@@ -101,6 +118,25 @@ public class CompactorSourceBuilder {
                         WatermarkStrategy.noWatermarks(),
                         tableIdentifier + "-compact-source",
                         
InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(produceType)));
+        if (isContinuous) {
+            Preconditions.checkArgument(
+                    partitionIdleTime == null, "Streaming mode does not 
support partitionIdleTime");
+        } else if (partitionIdleTime != null) {
+            Map<BinaryRow, Long> partitionInfo = 
getPartitionInfo(bucketsTable);
+            long historyMilli =
+                    LocalDateTime.now()
+                            .minus(partitionIdleTime)
+                            .atZone(ZoneId.systemDefault())
+                            .toInstant()
+                            .toEpochMilli();
+            SingleOutputStreamOperator<RowData> filterStream =
+                    dataStream.filter(
+                            rowData -> {
+                                BinaryRow partition = 
deserializeBinaryRow(rowData.getBinary(1));
+                                return partitionInfo.get(partition) <= 
historyMilli;
+                            });
+            dataStream = new DataStreamSource<>(filterStream);
+        }
         Integer parallelism =
                 
Options.fromMap(table.options()).get(FlinkConnectorOptions.SCAN_PARALLELISM);
         if (parallelism != null) {
@@ -138,4 +174,13 @@ public class CompactorSourceBuilder {
         this.partitionPredicate = partitionPredicate;
         return this;
     }
+
+    private Map<BinaryRow, Long> getPartitionInfo(BucketsTable table) {
+        List<PartitionEntry> partitions = 
table.newSnapshotReader().partitionEntries();
+
+        return partitions.stream()
+                .collect(
+                        Collectors.toMap(
+                                PartitionEntry::partition, 
PartitionEntry::lastFileCreationTime));
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java
index 22ef33029..cee6081aa 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java
@@ -39,6 +39,7 @@ import org.apache.flink.table.data.RowData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
 import java.util.regex.Pattern;
 
 import static 
org.apache.paimon.flink.compact.MultiTableScanBase.ScanResult.FINISHED;
@@ -98,7 +99,8 @@ public class CombinedAwareBatchSourceFunction
             Catalog.Loader catalogLoader,
             Pattern includingPattern,
             Pattern excludingPattern,
-            Pattern databasePattern) {
+            Pattern databasePattern,
+            Duration partitionIdleTime) {
         CombinedAwareBatchSourceFunction function =
                 new CombinedAwareBatchSourceFunction(
                         catalogLoader, includingPattern, excludingPattern, 
databasePattern);
@@ -112,7 +114,10 @@ public class CombinedAwareBatchSourceFunction
                 .partitionCustom(
                         (key, numPartitions) -> key % numPartitions,
                         split -> ((DataSplit) split.f0).bucket())
-                .transform(name, typeInfo, new 
MultiTablesReadOperator(catalogLoader, false));
+                .transform(
+                        name,
+                        typeInfo,
+                        new MultiTablesReadOperator(catalogLoader, false, 
partitionIdleTime));
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java
index 67579f2d0..b0b5a7784 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java
@@ -20,9 +20,14 @@ package org.apache.paimon.flink.source.operator;
 
 import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask;
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.flink.compact.MultiTableScanBase;
 import org.apache.paimon.flink.compact.MultiUnawareBucketTableScan;
 import org.apache.paimon.flink.sink.MultiTableCompactionTaskTypeInfo;
+import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
 
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.configuration.Configuration;
@@ -36,7 +41,13 @@ import 
org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 import static 
org.apache.paimon.flink.compact.MultiTableScanBase.ScanResult.FINISHED;
 import static 
org.apache.paimon.flink.compact.MultiTableScanBase.ScanResult.IS_EMPTY;
@@ -96,7 +107,8 @@ public class CombinedUnawareBatchSourceFunction
             Catalog.Loader catalogLoader,
             Pattern includingPattern,
             Pattern excludingPattern,
-            Pattern databasePattern) {
+            Pattern databasePattern,
+            @Nullable Duration partitionIdleTime) {
         CombinedUnawareBatchSourceFunction function =
                 new CombinedUnawareBatchSourceFunction(
                         catalogLoader, includingPattern, excludingPattern, 
databasePattern);
@@ -115,6 +127,14 @@ public class CombinedUnawareBatchSourceFunction
                                 Boundedness.BOUNDED)
                         .forceNonParallel();
 
+        if (partitionIdleTime != null) {
+            source =
+                    source.transform(
+                            name,
+                            compactionTaskTypeInfo,
+                            new MultiUnawareTablesReadOperator(catalogLoader, 
partitionIdleTime));
+        }
+
         PartitionTransformation<MultiTableUnawareAppendCompactionTask> 
transformation =
                 new PartitionTransformation<>(
                         source.getTransformation(), new 
RebalancePartitioner<>());
@@ -122,6 +142,38 @@ public class CombinedUnawareBatchSourceFunction
         return new DataStream<>(env, transformation);
     }
 
+    private static Long getPartitionInfo(
+            Identifier tableIdentifier,
+            BinaryRow partition,
+            Map<Identifier, Map<BinaryRow, Long>> multiTablesPartitionInfo,
+            Catalog catalog) {
+        Map<BinaryRow, Long> partitionInfo = 
multiTablesPartitionInfo.get(tableIdentifier);
+        if (partitionInfo == null) {
+            try {
+                Table table = catalog.getTable(tableIdentifier);
+                if (!(table instanceof FileStoreTable)) {
+                    LOGGER.error(
+                            String.format(
+                                    "Only FileStoreTable supports compact 
action. The table type is '%s'.",
+                                    table.getClass().getName()));
+                }
+                FileStoreTable fileStoreTable = (FileStoreTable) table;
+                List<PartitionEntry> partitions =
+                        fileStoreTable.newSnapshotReader().partitionEntries();
+                partitionInfo =
+                        partitions.stream()
+                                .collect(
+                                        Collectors.toMap(
+                                                PartitionEntry::partition,
+                                                
PartitionEntry::lastFileCreationTime));
+                multiTablesPartitionInfo.put(tableIdentifier, partitionInfo);
+            } catch (Catalog.TableNotExistException e) {
+                LOGGER.error(String.format("table: %s not found.", 
tableIdentifier.getFullName()));
+            }
+        }
+        return partitionInfo.get(partition);
+    }
+
     @Override
     public void close() throws Exception {
         super.close();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
index d43245212..0b48a9bcc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
@@ -20,9 +20,11 @@ package org.apache.paimon.flink.source.operator;
 
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.Split;
@@ -37,10 +39,16 @@ import 
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.data.RowData;
 
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import static 
org.apache.paimon.flink.utils.MultiTablesCompactorUtil.compactOptions;
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
 
 /**
  * The operator that reads the Tuple2<{@link Split}, String> received from the 
preceding {@link
@@ -56,11 +64,20 @@ public class MultiTablesReadOperator extends 
AbstractStreamOperator<RowData>
     private final Catalog.Loader catalogLoader;
     private final boolean isStreaming;
 
+    private Duration partitionIdleTime = null;
+
     public MultiTablesReadOperator(Catalog.Loader catalogLoader, boolean 
isStreaming) {
         this.catalogLoader = catalogLoader;
         this.isStreaming = isStreaming;
     }
 
+    public MultiTablesReadOperator(
+            Catalog.Loader catalogLoader, boolean isStreaming, Duration 
partitionIdleTime) {
+        this.catalogLoader = catalogLoader;
+        this.isStreaming = isStreaming;
+        this.partitionIdleTime = partitionIdleTime;
+    }
+
     private transient Catalog catalog;
     private transient IOManager ioManager;
     private transient Map<Identifier, BucketsTable> tablesMap;
@@ -83,17 +100,33 @@ public class MultiTablesReadOperator extends 
AbstractStreamOperator<RowData>
 
         this.reuseRow = new FlinkRowData(null);
         this.reuseRecord = new StreamRecord<>(reuseRow);
+
+        if (isStreaming) {
+            Preconditions.checkArgument(
+                    partitionIdleTime == null, "Streaming mode does not 
support partitionIdleTime");
+        }
     }
 
     @Override
     public void processElement(StreamRecord<Tuple2<Split, String>> record) 
throws Exception {
         Identifier identifier = Identifier.fromString(record.getValue().f1);
         TableRead read = getTableRead(identifier);
+        Map<BinaryRow, Long> partitionInfo = 
getPartitionInfo(tablesMap.get(identifier));
         try (CloseableIterator<InternalRow> iterator =
                 read.createReader(record.getValue().f0).toCloseableIterator()) 
{
-            while (iterator.hasNext()) {
-                reuseRow.replace(iterator.next());
-                output.collect(reuseRecord);
+            if (partitionIdleTime == null) {
+                while (iterator.hasNext()) {
+                    reuseRow.replace(iterator.next());
+                    output.collect(reuseRecord);
+                }
+            } else {
+                while (iterator.hasNext()) {
+                    InternalRow row = iterator.next();
+                    if (checkIsHistoryPartition(row, partitionInfo)) {
+                        reuseRow.replace(row);
+                        output.collect(reuseRecord);
+                    }
+                }
             }
         }
     }
@@ -123,6 +156,26 @@ public class MultiTablesReadOperator extends 
AbstractStreamOperator<RowData>
         return readsMap.get(tableId);
     }
 
+    private Map<BinaryRow, Long> getPartitionInfo(BucketsTable table) {
+        List<PartitionEntry> partitions = 
table.newSnapshotReader().partitionEntries();
+
+        return partitions.stream()
+                .collect(
+                        Collectors.toMap(
+                                PartitionEntry::partition, 
PartitionEntry::lastFileCreationTime));
+    }
+
+    private boolean checkIsHistoryPartition(InternalRow row, Map<BinaryRow, 
Long> partitionInfo) {
+        BinaryRow partition = deserializeBinaryRow(row.getBinary(1));
+        long historyMilli =
+                LocalDateTime.now()
+                        .minus(partitionIdleTime)
+                        .atZone(ZoneId.systemDefault())
+                        .toInstant()
+                        .toEpochMilli();
+        return partitionInfo.get(partition) <= historyMilli;
+    }
+
     @Override
     public void close() throws Exception {
         super.close();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiUnawareTablesReadOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiUnawareTablesReadOperator.java
new file mode 100644
index 000000000..4dba03cf4
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiUnawareTablesReadOperator.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.operator;
+
+import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * The operator is used for historical partition compaction. It reads {@link
+ * MultiTableUnawareAppendCompactionTask} received from the preceding {@link
+ * CombinedUnawareBatchSourceFunction} and filter partitions which is not 
historical.
+ */
+public class MultiUnawareTablesReadOperator
+        extends AbstractStreamOperator<MultiTableUnawareAppendCompactionTask>
+        implements OneInputStreamOperator<
+                MultiTableUnawareAppendCompactionTask, 
MultiTableUnawareAppendCompactionTask> {
+    private static final long serialVersionUID = 1L;
+
+    private final Catalog.Loader catalogLoader;
+
+    private final Duration partitionIdleTime;
+
+    public MultiUnawareTablesReadOperator(
+            Catalog.Loader catalogLoader, Duration partitionIdleTime) {
+        this.catalogLoader = catalogLoader;
+        this.partitionIdleTime = partitionIdleTime;
+    }
+
+    private transient Catalog catalog;
+    private transient Map<Identifier, FileStoreTable> tablesMap;
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        tablesMap = new HashMap<>();
+        catalog = catalogLoader.load();
+    }
+
+    @Override
+    public void 
processElement(StreamRecord<MultiTableUnawareAppendCompactionTask> record) {
+        Identifier identifier = record.getValue().tableIdentifier();
+        BinaryRow partition = record.getValue().partition();
+        FileStoreTable table = getTable(identifier);
+        Map<BinaryRow, Long> partitionInfo = getPartitionInfo(table);
+        if (checkIsHistoryPartition(partition, partitionInfo)) {
+            output.collect(record);
+        }
+    }
+
+    private FileStoreTable getTable(Identifier tableId) {
+        FileStoreTable table = tablesMap.get(tableId);
+        if (table == null) {
+            try {
+                Table newTable = catalog.getTable(tableId);
+                Preconditions.checkArgument(
+                        newTable instanceof FileStoreTable,
+                        "Only FileStoreTable supports compact action. The 
table type is '%s'.",
+                        newTable.getClass().getName());
+                table = (FileStoreTable) newTable;
+                tablesMap.put(tableId, table);
+            } catch (Catalog.TableNotExistException e) {
+                LOG.error(String.format("table: %s not found.", 
tableId.getFullName()));
+            }
+        }
+
+        return table;
+    }
+
+    private Map<BinaryRow, Long> getPartitionInfo(FileStoreTable table) {
+        List<PartitionEntry> partitions = 
table.newSnapshotReader().partitionEntries();
+        return partitions.stream()
+                .collect(
+                        Collectors.toMap(
+                                PartitionEntry::partition, 
PartitionEntry::lastFileCreationTime));
+    }
+
+    private boolean checkIsHistoryPartition(
+            BinaryRow partition, Map<BinaryRow, Long> partitionInfo) {
+        long historyMilli =
+                LocalDateTime.now()
+                        .minus(partitionIdleTime)
+                        .atZone(ZoneId.systemDefault())
+                        .toInstant()
+                        .toEpochMilli();
+        return partitionInfo.get(partition) <= historyMilli;
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (catalog != null) {
+            catalog.close();
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/MultiTablesCompactorUtil.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/MultiTablesCompactorUtil.java
index fc648b468..27264eb80 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/MultiTablesCompactorUtil.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/MultiTablesCompactorUtil.java
@@ -62,6 +62,20 @@ public class MultiTablesCompactorUtil {
         }
     }
 
+    public static Map<String, String> partitionCompactOptions() {
+
+        return new HashMap<String, String>() {
+            {
+                put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), null);
+                put(CoreOptions.SCAN_TIMESTAMP.key(), null);
+                put(CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS.key(), null);
+                put(CoreOptions.SCAN_SNAPSHOT_ID.key(), null);
+                put(CoreOptions.SCAN_MODE.key(), 
CoreOptions.StartupMode.LATEST_FULL.toString());
+                put(CoreOptions.WRITE_ONLY.key(), "false");
+            }
+        };
+    }
+
     public static boolean shouldCompactTable(
             Identifier tableIdentifier, Pattern includingPattern, Pattern 
excludingPattern) {
         String paimonFullTableName = tableIdentifier.getFullName();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index 9094239c9..ba95113d9 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -39,6 +39,8 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -169,6 +171,79 @@ public class CompactActionITCase extends 
CompactActionITCaseBase {
                 String.format("Cannot validate snapshot expiration in %s 
milliseconds.", 60_000));
     }
 
+    @ParameterizedTest(name = "mode = {0}")
+    @ValueSource(booleans = {true, false})
+    @Timeout(60)
+    public void testHistoryPartitionCompact(boolean mode) throws Exception {
+        String partitionIdleTime = "5s";
+        FileStoreTable table;
+        if (mode) {
+            table =
+                    prepareTable(
+                            Arrays.asList("dt", "hh"),
+                            Arrays.asList("dt", "hh", "k"),
+                            Collections.emptyList(),
+                            
Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true"));
+        } else {
+            // for unaware bucket table
+            Map<String, String> tableOptions = new HashMap<>();
+            tableOptions.put(CoreOptions.BUCKET.key(), "-1");
+            tableOptions.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
+            tableOptions.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2");
+
+            table =
+                    prepareTable(
+                            Arrays.asList("dt", "hh"),
+                            Collections.emptyList(),
+                            Collections.emptyList(),
+                            tableOptions);
+        }
+
+        writeData(
+                rowData(1, 100, 15, BinaryString.fromString("20221208")),
+                rowData(1, 100, 16, BinaryString.fromString("20221208")),
+                rowData(1, 100, 15, BinaryString.fromString("20221209")));
+
+        writeData(
+                rowData(2, 100, 15, BinaryString.fromString("20221208")),
+                rowData(2, 100, 16, BinaryString.fromString("20221208")),
+                rowData(2, 100, 15, BinaryString.fromString("20221209")));
+
+        Thread.sleep(5000);
+        writeData(rowData(3, 100, 16, BinaryString.fromString("20221208")));
+        checkLatestSnapshot(table, 3, Snapshot.CommitKind.APPEND);
+
+        CompactAction action =
+                createAction(
+                        CompactAction.class,
+                        "compact",
+                        "--warehouse",
+                        warehouse,
+                        "--database",
+                        database,
+                        "--table",
+                        tableName,
+                        "--partition_idle_time",
+                        partitionIdleTime);
+        StreamExecutionEnvironment env = 
streamExecutionEnvironmentBuilder().batchMode().build();
+        action.withStreamExecutionEnvironment(env).build();
+        env.execute();
+
+        checkLatestSnapshot(table, 4, Snapshot.CommitKind.COMPACT);
+
+        List<DataSplit> splits = table.newSnapshotReader().read().dataSplits();
+        assertThat(splits.size()).isEqualTo(3);
+        for (DataSplit split : splits) {
+            if (split.partition().getInt(1) == 15) {
+                // compacted
+                assertThat(split.dataFiles().size()).isEqualTo(1);
+            } else {
+                // not compacted
+                assertThat(split.dataFiles().size()).isEqualTo(3);
+            }
+        }
+    }
+
     @Test
     public void testUnawareBucketStreamingCompact() throws Exception {
         Map<String, String> tableOptions = new HashMap<>();
@@ -294,6 +369,39 @@ public class CompactActionITCase extends 
CompactActionITCaseBase {
                 .hasMessage("Only parition key can be specialized in 
compaction action.");
     }
 
+    @Test
+    public void testWrongUsage() throws Exception {
+        Map<String, String> tableOptions = new HashMap<>();
+        tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
+        tableOptions.put(CoreOptions.BUCKET.key(), "-1");
+
+        prepareTable(
+                Collections.singletonList("v"),
+                Arrays.asList(),
+                Collections.emptyList(),
+                tableOptions);
+
+        // partition_idle_time can not be used with order-strategy
+        Assertions.assertThatThrownBy(
+                        () ->
+                                createAction(
+                                        CompactAction.class,
+                                        "compact",
+                                        "--warehouse",
+                                        warehouse,
+                                        "--database",
+                                        database,
+                                        "--table",
+                                        tableName,
+                                        "--partition_idle_time",
+                                        "5s",
+                                        "--order_strategy",
+                                        "zorder",
+                                        "--order_by",
+                                        "dt,hh"))
+                .hasMessage("sort compact do not support 
'partition_idle_time'.");
+    }
+
     private FileStoreTable prepareTable(
             List<String> partitionKeys,
             List<String> primaryKeys,
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
index 80b8e3798..c5008b216 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
@@ -500,6 +500,107 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
         }
     }
 
+    @ParameterizedTest(name = "mode = {0}")
+    @ValueSource(strings = {"divided", "combined"})
+    @Timeout(60)
+    public void testHistoryPartitionCompact(String mode) throws Exception {
+        List<FileStoreTable> tables = new ArrayList<>();
+        String partitionIdleTime = "10s";
+
+        for (String dbName : DATABASE_NAMES) {
+            for (String tableName : TABLE_NAMES) {
+                Map<String, String> option = new HashMap<>();
+                option.put(CoreOptions.WRITE_ONLY.key(), "true");
+                List<String> keys;
+                if (tableName.endsWith("unaware_bucket")) {
+                    option.put("bucket", "-1");
+                    option.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
+                    option.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2");
+                    keys = Lists.newArrayList();
+                } else {
+                    option.put("bucket", "1");
+                    keys = Arrays.asList("dt", "hh", "k");
+                }
+                FileStoreTable table =
+                        createTable(dbName, tableName, Arrays.asList("dt", 
"hh"), keys, option);
+                tables.add(table);
+                SnapshotManager snapshotManager = table.snapshotManager();
+                StreamWriteBuilder streamWriteBuilder =
+                        
table.newStreamWriteBuilder().withCommitUser(commitUser);
+                write = streamWriteBuilder.newWrite();
+                commit = streamWriteBuilder.newCommit();
+
+                writeData(
+                        rowData(1, 100, 15, 
BinaryString.fromString("20221208")),
+                        rowData(1, 100, 16, 
BinaryString.fromString("20221208")),
+                        rowData(1, 100, 15, 
BinaryString.fromString("20221209")));
+
+                writeData(
+                        rowData(2, 100, 15, 
BinaryString.fromString("20221208")),
+                        rowData(2, 100, 16, 
BinaryString.fromString("20221208")),
+                        rowData(2, 100, 15, 
BinaryString.fromString("20221209")));
+
+                Snapshot snapshot = 
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+                assertThat(snapshot.id()).isEqualTo(2);
+                
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+                write.close();
+                commit.close();
+            }
+        }
+
+        // sleep 3s, update partition 20221208-16
+        Thread.sleep(10000);
+        for (FileStoreTable table : tables) {
+            StreamWriteBuilder streamWriteBuilder =
+                    table.newStreamWriteBuilder().withCommitUser(commitUser);
+            write = streamWriteBuilder.newWrite();
+            commit = streamWriteBuilder.newCommit();
+            writeData(rowData(3, 100, 16, 
BinaryString.fromString("20221208")));
+        }
+
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            StreamExecutionEnvironment env =
+                    streamExecutionEnvironmentBuilder().batchMode().build();
+            createAction(
+                            CompactDatabaseAction.class,
+                            "compact_database",
+                            "--warehouse",
+                            warehouse,
+                            "--mode",
+                            mode,
+                            "--partition_idle_time",
+                            partitionIdleTime)
+                    .withStreamExecutionEnvironment(env)
+                    .build();
+            env.execute();
+        } else {
+            callProcedure(
+                    String.format(
+                            "CALL sys.compact_database('', 
'%s','','','','%s')",
+                            mode, partitionIdleTime),
+                    false,
+                    true);
+        }
+
+        for (FileStoreTable table : tables) {
+            SnapshotManager snapshotManager = table.snapshotManager();
+            Snapshot snapshot =
+                    
table.snapshotManager().snapshot(snapshotManager.latestSnapshotId());
+            assertThat(snapshot.id()).isEqualTo(4);
+            
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
+
+            List<DataSplit> splits = 
table.newSnapshotReader().read().dataSplits();
+            assertThat(splits.size()).isEqualTo(3);
+            for (DataSplit split : splits) {
+                if (split.partition().getInt(1) == 16) {
+                    assertThat(split.dataFiles().size()).isEqualTo(3);
+                } else {
+                    assertThat(split.dataFiles().size()).isEqualTo(1);
+                }
+            }
+        }
+    }
+
     @ParameterizedTest(name = "mode = {0}")
     @ValueSource(strings = {"divided", "combined"})
     @Timeout(60)
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
index f7db0dfcf..9afd04c8d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
@@ -51,6 +51,7 @@ import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -287,6 +288,54 @@ public class CompactorSourceITCase extends 
AbstractTestBase {
         it.close();
     }
 
+    @ParameterizedTest(name = "defaultOptions = {0}")
+    @ValueSource(booleans = {true, false})
+    public void testHistoryPartitionRead(boolean defaultOptions) throws 
Exception {
+        Duration partitionIdleTime = Duration.ofMillis(3000);
+        FileStoreTable table = createFileStoreTable();
+        if (!defaultOptions) {
+            // change options to test whether CompactorSourceBuilder work 
normally
+            table = 
table.copy(Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(), "2"));
+        }
+        StreamWriteBuilder streamWriteBuilder =
+                table.newStreamWriteBuilder().withCommitUser(commitUser);
+        StreamTableWrite write = streamWriteBuilder.newWrite();
+        StreamTableCommit commit = streamWriteBuilder.newCommit();
+
+        write.write(rowData(1, 1510, BinaryString.fromString("20221208"), 15));
+        write.write(rowData(1, 1620, BinaryString.fromString("20221208"), 16));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(rowData(2, 1511, BinaryString.fromString("20221208"), 15));
+        write.write(rowData(2, 1510, BinaryString.fromString("20221209"), 15));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        Thread.sleep(3000);
+        write.write(rowData(3, 1510, BinaryString.fromString("20221208"), 16));
+        commit.commit(2, write.prepareCommit(true, 2));
+
+        StreamExecutionEnvironment env =
+                streamExecutionEnvironmentBuilder().streamingMode().build();
+        DataStreamSource<RowData> compactorSource =
+                new CompactorSourceBuilder("test", table)
+                        .withContinuousMode(false)
+                        .withPartitionIdleTime(partitionIdleTime)
+                        .withEnv(env)
+                        .build();
+        CloseableIterator<RowData> it = compactorSource.executeAndCollect();
+
+        List<String> actual = new ArrayList<>();
+        while (it.hasNext()) {
+            actual.add(toString(it.next()));
+        }
+        assertThat(actual)
+                .hasSameElementsAs(Arrays.asList("+I 3|20221208|15|0|0", "+I 
3|20221209|15|0|0"));
+
+        write.close();
+        commit.close();
+        it.close();
+    }
+
     private String toString(RowData rowData) {
         int numFiles;
         try {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java
index d1acde411..fba5f3380 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java
@@ -53,6 +53,7 @@ import org.junit.jupiter.params.provider.ValueSource;
 import java.io.IOException;
 import java.io.Serializable;
 import java.io.UncheckedIOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -491,6 +492,109 @@ public class MultiTablesCompactorSourceBuilderITCase 
extends AbstractTestBase
         it.close();
     }
 
+    @ParameterizedTest(name = "defaultOptions = {0}")
+    @ValueSource(booleans = {true, false})
+    public void testHistoryPatitionRead(boolean defaultOptions) throws 
Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.WRITE_ONLY.key(), "true");
+        if (!defaultOptions) {
+            // change options to test whether CompactorSourceBuilder work 
normally
+            options.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), "2");
+        }
+        options.put("bucket", "1");
+        long monitorInterval = 1000;
+        Duration partitionIdleTime = Duration.ofMillis(3000);
+        List<FileStoreTable> tables = new ArrayList<>();
+
+        for (String dbName : DATABASE_NAMES) {
+            for (String tableName : TABLE_NAMES) {
+                FileStoreTable table =
+                        createTable(
+                                dbName,
+                                tableName,
+                                ROW_TYPE_MAP.get(tableName),
+                                Arrays.asList("dt", "hh"),
+                                Arrays.asList("dt", "hh", "k"),
+                                options);
+                tables.add(table);
+                SnapshotManager snapshotManager = table.snapshotManager();
+                StreamWriteBuilder streamWriteBuilder =
+                        
table.newStreamWriteBuilder().withCommitUser(commitUser);
+                StreamTableWrite write = streamWriteBuilder.newWrite();
+                StreamTableCommit commit = streamWriteBuilder.newCommit();
+
+                writeData(
+                        write,
+                        commit,
+                        0,
+                        rowData(1, 100, 15, 
BinaryString.fromString("20221208")),
+                        rowData(1, 100, 16, 
BinaryString.fromString("20221208")),
+                        rowData(1, 100, 15, 
BinaryString.fromString("20221209")));
+
+                writeData(
+                        write,
+                        commit,
+                        1,
+                        rowData(2, 100, 15, 
BinaryString.fromString("20221208")),
+                        rowData(2, 100, 16, 
BinaryString.fromString("20221208")),
+                        rowData(2, 100, 15, 
BinaryString.fromString("20221209")));
+
+                Snapshot snapshot = 
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+                assertThat(snapshot.id()).isEqualTo(2);
+                
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+
+                write.close();
+                commit.close();
+            }
+        }
+
+        // sleep 3 seconds, and update partition 20221208-16
+        Thread.sleep(3000);
+
+        for (FileStoreTable table : tables) {
+            StreamWriteBuilder streamWriteBuilder =
+                    table.newStreamWriteBuilder().withCommitUser(commitUser);
+            StreamTableWrite write = streamWriteBuilder.newWrite();
+            StreamTableCommit commit = streamWriteBuilder.newCommit();
+            writeData(write, commit, 2, rowData(3, 100, 16, 
BinaryString.fromString("20221208")));
+        }
+
+        StreamExecutionEnvironment env =
+                streamExecutionEnvironmentBuilder()
+                        .batchMode()
+                        .parallelism(ThreadLocalRandom.current().nextInt(2) + 
1)
+                        .build();
+
+        DataStream<RowData> source =
+                new CombinedTableCompactorSourceBuilder(
+                                catalogLoader(),
+                                Pattern.compile("db1|db2"),
+                                Pattern.compile(".*"),
+                                null,
+                                monitorInterval)
+                        .withPartitionIdleTime(partitionIdleTime)
+                        .withContinuousMode(false)
+                        .withEnv(env)
+                        .buildAwareBucketTableSource();
+        CloseableIterator<RowData> it = source.executeAndCollect();
+        List<String> actual = new ArrayList<>();
+        while (it.hasNext()) {
+            actual.add(toString(it.next()));
+        }
+        assertThat(actual)
+                .hasSameElementsAs(
+                        Arrays.asList(
+                                "+I 3|20221208|15|0|0|db1|t1",
+                                "+I 3|20221209|15|0|0|db1|t1",
+                                "+I 3|20221208|15|0|0|db1|t2",
+                                "+I 3|20221209|15|0|0|db1|t2",
+                                "+I 3|20221208|15|0|0|db2|t1",
+                                "+I 3|20221209|15|0|0|db2|t1",
+                                "+I 3|20221208|15|0|0|db2|t2",
+                                "+I 3|20221209|15|0|0|db2|t2"));
+        it.close();
+    }
+
     private FileStoreTable createTable(
             String databaseName,
             String tableName,
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 1c1d73860..b541249bd 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -24,6 +24,7 @@ import org.apache.paimon.append.UnawareAppendCompactionTask;
 import org.apache.paimon.append.UnawareAppendTableCompactionCoordinator;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.spark.PaimonSplitScan;
@@ -48,6 +49,7 @@ import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.ParameterUtils;
 import org.apache.paimon.utils.SerializationUtils;
 import org.apache.paimon.utils.StringUtils;
+import org.apache.paimon.utils.TimeUtils;
 
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -71,6 +73,9 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -80,6 +85,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -113,6 +119,7 @@ public class CompactProcedure extends BaseProcedure {
                 ProcedureParameter.optional("where", StringType),
                 ProcedureParameter.optional("max_concurrent_jobs", 
IntegerType),
                 ProcedureParameter.optional("options", StringType),
+                ProcedureParameter.optional("partition_idle_time", StringType),
             };
 
     private static final StructType OUTPUT_TYPE =
@@ -147,10 +154,16 @@ public class CompactProcedure extends BaseProcedure {
         String where = blank(args, 4) ? null : args.getString(4);
         int maxConcurrentJobs = args.isNullAt(5) ? 15 : args.getInt(5);
         String options = args.isNullAt(6) ? null : args.getString(6);
+        Duration partitionIdleTime =
+                blank(args, 7) ? null : 
TimeUtils.parseDuration(args.getString(7));
         if (TableSorter.OrderType.NONE.name().equals(sortType) && 
!sortColumns.isEmpty()) {
             throw new IllegalArgumentException(
                     "order_strategy \"none\" cannot work with order_by 
columns.");
         }
+        if (partitionIdleTime != null && 
(!TableSorter.OrderType.NONE.name().equals(sortType))) {
+            throw new IllegalArgumentException(
+                    "sort compact do not support 'partition_idle_time'.");
+        }
         checkArgument(
                 partitions == null || where == null,
                 "partitions and where cannot be used together.");
@@ -193,7 +206,8 @@ public class CompactProcedure extends BaseProcedure {
                                             sortColumns,
                                             relation,
                                             condition,
-                                            maxConcurrentJobs));
+                                            maxConcurrentJobs,
+                                            partitionIdleTime));
                     return new InternalRow[] {internalRow};
                 });
     }
@@ -213,7 +227,8 @@ public class CompactProcedure extends BaseProcedure {
             List<String> sortColumns,
             DataSourceV2Relation relation,
             @Nullable Expression condition,
-            int maxConcurrentJobs) {
+            int maxConcurrentJobs,
+            @Nullable Duration partitionIdleTime) {
         BucketMode bucketMode = table.bucketMode();
         TableSorter.OrderType orderType = TableSorter.OrderType.of(sortType);
         Predicate filter =
@@ -230,10 +245,10 @@ public class CompactProcedure extends BaseProcedure {
             switch (bucketMode) {
                 case HASH_FIXED:
                 case HASH_DYNAMIC:
-                    compactAwareBucketTable(table, filter, javaSparkContext);
+                    compactAwareBucketTable(table, filter, partitionIdleTime, 
javaSparkContext);
                     break;
                 case BUCKET_UNAWARE:
-                    compactUnAwareBucketTable(table, filter, javaSparkContext);
+                    compactUnAwareBucketTable(table, filter, 
partitionIdleTime, javaSparkContext);
                     break;
                 default:
                     throw new UnsupportedOperationException(
@@ -256,17 +271,22 @@ public class CompactProcedure extends BaseProcedure {
     }
 
     private void compactAwareBucketTable(
-            FileStoreTable table, @Nullable Predicate filter, JavaSparkContext 
javaSparkContext) {
+            FileStoreTable table,
+            @Nullable Predicate filter,
+            @Nullable Duration partitionIdleTime,
+            JavaSparkContext javaSparkContext) {
         SnapshotReader snapshotReader = table.newSnapshotReader();
         if (filter != null) {
             snapshotReader.withFilter(filter);
         }
-
+        Set<BinaryRow> partitionToBeCompacted =
+                getHistoryPartition(snapshotReader, partitionIdleTime);
         List<Pair<byte[], Integer>> partitionBuckets =
                 snapshotReader.read().splits().stream()
                         .map(split -> (DataSplit) split)
                         .map(dataSplit -> Pair.of(dataSplit.partition(), 
dataSplit.bucket()))
                         .distinct()
+                        .filter(pair -> 
partitionToBeCompacted.contains(pair.getKey()))
                         .map(
                                 p ->
                                         Pair.of(
@@ -329,9 +349,30 @@ public class CompactProcedure extends BaseProcedure {
     }
 
     private void compactUnAwareBucketTable(
-            FileStoreTable table, @Nullable Predicate filter, JavaSparkContext 
javaSparkContext) {
+            FileStoreTable table,
+            @Nullable Predicate filter,
+            @Nullable Duration partitionIdleTime,
+            JavaSparkContext javaSparkContext) {
         List<UnawareAppendCompactionTask> compactionTasks =
                 new UnawareAppendTableCompactionCoordinator(table, false, 
filter).run();
+        if (partitionIdleTime != null) {
+            Map<BinaryRow, Long> partitionInfo =
+                    table.newSnapshotReader().partitionEntries().stream()
+                            .collect(
+                                    Collectors.toMap(
+                                            PartitionEntry::partition,
+                                            
PartitionEntry::lastFileCreationTime));
+            long historyMilli =
+                    LocalDateTime.now()
+                            .minus(partitionIdleTime)
+                            .atZone(ZoneId.systemDefault())
+                            .toInstant()
+                            .toEpochMilli();
+            compactionTasks =
+                    compactionTasks.stream()
+                            .filter(task -> 
partitionInfo.get(task.partition()) <= historyMilli)
+                            .collect(Collectors.toList());
+        }
         if (compactionTasks.isEmpty()) {
             return;
         }
@@ -392,6 +433,31 @@ public class CompactProcedure extends BaseProcedure {
         }
     }
 
+    private Set<BinaryRow> getHistoryPartition(
+            SnapshotReader snapshotReader, @Nullable Duration 
partitionIdleTime) {
+        Set<Pair<BinaryRow, Long>> partitionInfo =
+                snapshotReader.partitionEntries().stream()
+                        .map(
+                                partitionEntry ->
+                                        Pair.of(
+                                                partitionEntry.partition(),
+                                                
partitionEntry.lastFileCreationTime()))
+                        .collect(Collectors.toSet());
+        if (partitionIdleTime != null) {
+            long historyMilli =
+                    LocalDateTime.now()
+                            .minus(partitionIdleTime)
+                            .atZone(ZoneId.systemDefault())
+                            .toInstant()
+                            .toEpochMilli();
+            partitionInfo =
+                    partitionInfo.stream()
+                            .filter(partition -> partition.getValue() <= 
historyMilli)
+                            .collect(Collectors.toSet());
+        }
+        return 
partitionInfo.stream().map(Pair::getKey).collect(Collectors.toSet());
+    }
+
     private void sortCompactUnAwareBucketTable(
             FileStoreTable table,
             TableSorter.OrderType orderType,
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
index 73d6fc442..c522c1c9b 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
@@ -22,6 +22,7 @@ import org.apache.paimon.Snapshot.CommitKind
 import org.apache.paimon.fs.Path
 import org.apache.paimon.spark.PaimonSparkTestBase
 import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.table.source.DataSplit
 
 import org.apache.spark.sql.{Dataset, Row}
 import org.apache.spark.sql.execution.streaming.MemoryStream
@@ -480,6 +481,11 @@ abstract class CompactProcedureTestBase extends 
PaimonSparkTestBase with StreamT
     assert(intercept[IllegalArgumentException] {
       spark.sql("CALL sys.compact(table => 'T', order_strategy => 'sort', 
order_by => 'pt')")
     }.getMessage.contains("order_by should not contain partition cols"))
+
+    assert(intercept[IllegalArgumentException] {
+      spark.sql(
+        "CALL sys.compact(table => 'T', order_strategy => 'sort', order_by => 
'id', partition_idle_time =>'5s')")
+    }.getMessage.contains("sort compact do not support 'partition_idle_time'"))
   }
 
   test("Paimon Procedure: compact with where") {
@@ -561,6 +567,87 @@ abstract class CompactProcedureTestBase extends 
PaimonSparkTestBase with StreamT
         Row(5, "e", "p1") :: Row(6, "f", "p2") :: Nil)
   }
 
+  test("Paimon Procedure: compact with partition_idle_time for pk table") {
+    Seq(1, -1).foreach(
+      bucket => {
+        withTable("T") {
+          val dynamicBucketArgs = if (bucket == -1) " 
,'dynamic-bucket.initial-buckets'='1'" else ""
+          spark.sql(
+            s"""
+               |CREATE TABLE T (id INT, value STRING, dt STRING, hh INT)
+               |TBLPROPERTIES ('primary-key'='id, dt, hh', 'bucket'='$bucket', 
'write-only'='true'$dynamicBucketArgs)
+               |PARTITIONED BY (dt, hh)
+               |""".stripMargin)
+
+          val table = loadTable("T")
+
+          spark.sql(s"INSERT INTO T VALUES (1, '1', '2024-01-01', 0), (2, '2', 
'2024-01-01', 1)")
+          spark.sql(s"INSERT INTO T VALUES (5, '5', '2024-01-02', 0), (6, '6', 
'2024-01-02', 1)")
+          spark.sql(s"INSERT INTO T VALUES (3, '3', '2024-01-01', 0), (4, '4', 
'2024-01-01', 1)")
+          spark.sql(s"INSERT INTO T VALUES (7, '7', '2024-01-02', 0), (8, '8', 
'2024-01-02', 1)")
+
+          Thread.sleep(10000);
+          spark.sql(s"INSERT INTO T VALUES (9, '9', '2024-01-01', 0), (10, 
'10', '2024-01-02', 0)")
+
+          spark.sql("CALL sys.compact(table => 'T', partition_idle_time => 
'10s')")
+          val dataSplits = 
table.newSnapshotReader.read.dataSplits.asScala.toList
+          Assertions
+            .assertThat(dataSplits.size)
+            .isEqualTo(4)
+          
Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue
+          for (dataSplit: DataSplit <- dataSplits) {
+            if (dataSplit.partition().getInt(1) == 0) {
+              Assertions
+                .assertThat(dataSplit.dataFiles().size())
+                .isEqualTo(3)
+            } else {
+              Assertions
+                .assertThat(dataSplit.dataFiles().size())
+                .isEqualTo(1)
+            }
+          }
+        }
+      })
+
+  }
+
+  test("Paimon Procedure: compact with partition_idle_time for unaware bucket 
append table") {
+    spark.sql(
+      s"""
+         |CREATE TABLE T (id INT, value STRING, dt STRING, hh INT)
+         |TBLPROPERTIES ('bucket'='-1', 'write-only'='true', 
'compaction.min.file-num'='2', 'compaction.max.file-num'='2')
+         |PARTITIONED BY (dt, hh)
+         |""".stripMargin)
+
+    val table = loadTable("T")
+
+    spark.sql(s"INSERT INTO T VALUES (1, '1', '2024-01-01', 0), (2, '2', 
'2024-01-01', 1)")
+    spark.sql(s"INSERT INTO T VALUES (5, '5', '2024-01-02', 0), (6, '6', 
'2024-01-02', 1)")
+    spark.sql(s"INSERT INTO T VALUES (3, '3', '2024-01-01', 0), (4, '4', 
'2024-01-01', 1)")
+    spark.sql(s"INSERT INTO T VALUES (7, '7', '2024-01-02', 0), (8, '8', 
'2024-01-02', 1)")
+
+    Thread.sleep(10000);
+    spark.sql(s"INSERT INTO T VALUES (9, '9', '2024-01-01', 0), (10, '10', 
'2024-01-02', 0)")
+
+    spark.sql("CALL sys.compact(table => 'T', partition_idle_time => '10s')")
+    val dataSplits = table.newSnapshotReader.read.dataSplits.asScala.toList
+    Assertions
+      .assertThat(dataSplits.size)
+      .isEqualTo(4)
+    
Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue
+    for (dataSplit: DataSplit <- dataSplits) {
+      if (dataSplit.partition().getInt(1) == 0) {
+        Assertions
+          .assertThat(dataSplit.dataFiles().size())
+          .isEqualTo(3)
+      } else {
+        Assertions
+          .assertThat(dataSplit.dataFiles().size())
+          .isEqualTo(1)
+      }
+    }
+  }
+
   def lastSnapshotCommand(table: FileStoreTable): CommitKind = {
     table.snapshotManager().latestSnapshot().commitKind()
   }

Reply via email to