This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f3be98671 [flink][spark] support history partition compaction in
dedicated compaction job. (#3953)
f3be98671 is described below
commit f3be98671de69639c9de2381eeae84aa4de2db52
Author: LsomeYeah <[email protected]>
AuthorDate: Sun Aug 18 22:22:47 2024 +0800
[flink][spark] support history partition compaction in dedicated compaction
job. (#3953)
---
docs/content/flink/procedures.md | 3 +
docs/content/maintenance/dedicated-compaction.md | 115 +++++++++++++++++-
docs/content/spark/procedures.md | 4 +-
.../paimon/flink/procedure/CompactProcedure.java | 29 +++++
.../ProcedurePositionalArgumentsITCase.java | 3 +
.../apache/paimon/flink/action/CompactAction.java | 17 ++-
.../paimon/flink/action/CompactActionFactory.java | 21 +++-
.../paimon/flink/action/CompactDatabaseAction.java | 26 ++++-
.../flink/action/CompactDatabaseActionFactory.java | 28 ++++-
.../UnawareBucketCompactionTopoBuilder.java | 44 +++++++
.../flink/procedure/CompactDatabaseProcedure.java | 23 ++++
.../CombinedTableCompactorSourceBuilder.java | 16 ++-
.../flink/source/CompactorSourceBuilder.java | 45 +++++++
.../operator/CombinedAwareBatchSourceFunction.java | 9 +-
.../CombinedUnawareBatchSourceFunction.java | 54 ++++++++-
.../source/operator/MultiTablesReadOperator.java | 59 +++++++++-
.../operator/MultiUnawareTablesReadOperator.java | 129 +++++++++++++++++++++
.../flink/utils/MultiTablesCompactorUtil.java | 14 +++
.../paimon/flink/action/CompactActionITCase.java | 108 +++++++++++++++++
.../flink/action/CompactDatabaseActionITCase.java | 101 ++++++++++++++++
.../paimon/flink/source/CompactorSourceITCase.java | 49 ++++++++
.../MultiTablesCompactorSourceBuilderITCase.java | 104 +++++++++++++++++
.../paimon/spark/procedure/CompactProcedure.java | 80 +++++++++++--
.../spark/procedure/CompactProcedureTestBase.scala | 87 ++++++++++++++
24 files changed, 1134 insertions(+), 34 deletions(-)
diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 6209791b8..f028659a5 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -68,6 +68,7 @@ All available procedures are listed below.
<li>order_by(optional): the columns need to be sort. Left empty if
'order_strategy' is 'none'.</li>
<li>options(optional): additional dynamic options of the
table.</li>
<li>where(optional): partition predicate(Can't be used together
with "partitions"). Note: as where is a keyword,a pair of backticks need to add
around like `where`.</li>
+ <li>partition_idle_time(optional): this is used to do a full
compaction for partition which had not received any new data for
'partition_idle_time'. And only these partitions will be compacted. This
argument can not be used with order compact.</li>
</td>
<td>
-- use partition filter <br/>
@@ -85,6 +86,7 @@ All available procedures are listed below.
CALL [catalog.]sys.compact_database('includingDatabases', 'mode',
'includingTables') <br/><br/>
CALL [catalog.]sys.compact_database('includingDatabases', 'mode',
'includingTables', 'excludingTables') <br/><br/>
CALL [catalog.]sys.compact_database('includingDatabases', 'mode',
'includingTables', 'excludingTables', 'tableOptions')
+ CALL [catalog.]sys.compact_database('includingDatabases', 'mode',
'includingTables', 'excludingTables', 'tableOptions', 'partitionIdleTime')
</td>
<td>
To compact databases. Arguments:
@@ -95,6 +97,7 @@ All available procedures are listed below.
<li>includingTables: to specify tables. You can use regular
expression.</li>
<li>excludingTables: to specify tables that are not compacted. You
can use regular expression.</li>
<li>tableOptions: additional dynamic options of the table.</li>
+ <li>partition_idle_time: this is used to do a full compaction for
partition which had not received any new data for 'partition_idle_time'. And
only these partitions will be compacted.</li>
</td>
<td>
CALL sys.compact_database('db1|db2', 'combined', 'table_.*',
'ignore', 'sink.parallelism=4')
diff --git a/docs/content/maintenance/dedicated-compaction.md
b/docs/content/maintenance/dedicated-compaction.md
index 981823540..f7eb03e1d 100644
--- a/docs/content/maintenance/dedicated-compaction.md
+++ b/docs/content/maintenance/dedicated-compaction.md
@@ -128,7 +128,7 @@ For more usage of the compact action, see
{{< /tab >}}
-{{< tab "Flink" >}}
+{{< tab "Flink SQL" >}}
Run the following sql:
@@ -178,7 +178,7 @@ You can run the following command to submit a compaction
job for multiple databa
* `--including_databases` is used to specify which database is to be
compacted. In compact mode, you need to specify a database name, in
compact_database mode, you could specify multiple database, regular expression
is supported.
* `--including_tables` is used to specify which source tables are to be
compacted, you must use '|' to separate multiple tables, the format is
`databaseName.tableName`, regular expression is supported. For example,
specifying "--including_tables db1.t1|db2.+" means to compact table 'db1.t1'
and all tables in the db2 database.
* `--excluding_tables` is used to specify which source tables are not to be
compacted. The usage is same as "--including_tables". "--excluding_tables" has
higher priority than "--including_tables" if you specified both.
-* `mode` is used to specify compaction mode. Possible values:
+* `--mode` is used to specify compaction mode. Possible values:
* "divided" (the default mode if you haven't specified one): start a sink
for each table, the compaction of the new table requires restarting the job.
* "combined": start a single combined sink for all tables, the new table
will be automatically compacted.
* `--catalog_conf` is the configuration for Paimon catalog. Each configuration
should be specified in the format `key=value`. See [here]({{< ref
"maintenance/configurations" >}}) for a complete list of catalog configurations.
@@ -243,7 +243,7 @@ For more usage of the compact_database action, see
{{< /tab >}}
-{{< tab "Flink" >}}
+{{< tab "Flink SQL" >}}
Run the following sql:
@@ -284,7 +284,7 @@ you can trigger a compact with specified column sort to
speed up queries.
--database <database-name> \
--table <table-name> \
--order_strategy <orderType> \
- --order_by <col1,col2,...>
+ --order_by <col1,col2,...> \
[--partition <partition-name>] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf
<paimon-catalog-conf> ...]] \
[--table_conf <paimon-table-dynamic-conf> [--table_conf
<paimon-table-dynamic-conf>] ...]
@@ -296,7 +296,7 @@ The sort parallelism is the same as the sink parallelism,
you can dynamically sp
{{< /tab >}}
-{{< tab "Flink" >}}
+{{< tab "Flink SQL" >}}
Run the following sql:
@@ -308,3 +308,108 @@ CALL sys.compact(`table` => 'default.T', order_strategy
=> 'zorder', order_by =>
{{< /tabs >}}
+## Historical Partition Compact
+
+You can run the following command to submit a compaction job for partition
which has not received any new data for a period of time. Small files in those
partitions will be full compacted.
+
+{{< hint info >}}
+
+This feature now is only used in batch mode.
+
+{{< /hint >}}
+
+### For Table
+
+This is for one table.
+{{< tabs "history-partition-compaction-job for table" >}}
+
+{{< tab "Flink Action Jar" >}}
+
+```bash
+<FLINK_HOME>/bin/flink run \
+ -D execution.runtime-mode=batch \
+ /path/to/paimon-flink-action-{{< version >}}.jar \
+ compact \
+ --warehouse <warehouse-path> \
+ --database <database-name> \
+ --table <table-name> \
+ --partition_idle_time <partition-idle-time> \
+ [--partition <partition-name>] \
+ [--catalog_conf <paimon-catalog-conf> [--catalog_conf
<paimon-catalog-conf> ...]] \
+ [--table_conf <paimon-table-dynamic-conf> [--table_conf
<paimon-table-dynamic-conf>] ...]
+```
+There are one new configuration in `Historical Partition Compact`
+
+* `--partition_idle_time`: this is used to do a full compaction for partition
which had not received any new data for 'partition_idle_time'. And only these
partitions will be compacted.
+
+{{< /tab >}}
+
+{{< tab "Flink SQL" >}}
+
+Run the following sql:
+
+```sql
+-- history partition compact table
+CALL sys.compact(`table` => 'default.T', 'partition_idle_time' => '5s')
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+### For Databases
+
+This is for multiple tables in different databases.
+{{< tabs "history-partition-compaction-job for databases" >}}
+
+{{< tab "Flink Action Jar" >}}
+
+```bash
+<FLINK_HOME>/bin/flink run \
+ -D execution.runtime-mode=batch \
+ /path/to/paimon-flink-action-{{< version >}}.jar \
+ compact_database \
+ --warehouse <warehouse-path> \
+ --including_databases <database-name|name-regular-expr> \
+ --partition_idle_time <partition-idle-time> \
+ [--including_tables <paimon-table-name|name-regular-expr>] \
+ [--excluding_tables <paimon-table-name|name-regular-expr>] \
+ [--mode <compact-mode>] \
+ [--catalog_conf <paimon-catalog-conf> [--catalog_conf
<paimon-catalog-conf> ...]] \
+ [--table_conf <paimon-table_conf> [--table_conf <paimon-table_conf> ...]]
+```
+
+Example: compact historical partitions for tables in database
+
+```bash
+<FLINK_HOME>/bin/flink run \
+ /path/to/paimon-flink-action-{{< version >}}.jar \
+ compact_database \
+ --warehouse s3:///path/to/warehouse \
+ --including_databases test_db \
+ --partition_idle_time 60s \
+ --catalog_conf s3.endpoint=https://****.com \
+ --catalog_conf s3.access-key=***** \
+ --catalog_conf s3.secret-key=*****
+```
+
+{{< /tab >}}
+
+{{< tab "Flink SQL" >}}
+
+Run the following sql:
+
+```sql
+-- history partition compact table
+CALL sys.compact_database('includingDatabases', 'mode', 'includingTables',
'excludingTables', 'tableOptions', 'partition_idle_time')
+```
+
+Example: compact historical partitions for tables in database
+
+```sql
+-- history partition compact table
+CALL sys.compact_database('test_db', 'combined', '', '', '', '60s')
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
\ No newline at end of file
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index 9a53a79ee..d77163375 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -47,11 +47,13 @@ This section introduce all available spark procedures about
paimon.
<li>order_strategy: 'order' or 'zorder' or 'hilbert' or 'none'.
Left empty for 'none'.</li>
<li>order_columns: the columns need to be sort. Left empty if
'order_strategy' is 'none'.</li>
<li>max_concurrent_jobs: when sort compact is used, files in one
partition are grouped and submitted as a single spark compact job. This
parameter controls the maximum number of jobs that can be submitted
simultaneously. The default value is 15.</li>
+ <li>partition_idle_time: this is used to do a full compaction for
partition which had not received any new data for 'partition_idle_time'. And
only these partitions will be compacted. This argument can not be used with
order compact.</li>
</td>
<td>
SET spark.sql.shuffle.partitions=10; --set the compact parallelism
<br/><br/>
CALL sys.compact(table => 'T', partitions => 'p=0;p=1',
order_strategy => 'zorder', order_by => 'a,b') <br/><br/>
- CALL sys.compact(table => 'T', where => 'p>0 and p<3', order_strategy
=> 'zorder', order_by => 'a,b')
+ CALL sys.compact(table => 'T', where => 'p>0 and p<3', order_strategy
=> 'zorder', order_by => 'a,b') <br/><br/>
+ CALL sys.compact(table => 'T', partition_idle_time => '60s')
</td>
</tr>
<tr>
diff --git
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
index 2fcbc98ba..e6cbb299c 100644
---
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
+++
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
@@ -22,7 +22,9 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.CompactAction;
import org.apache.paimon.flink.action.SortCompactAction;
import org.apache.paimon.utils.ParameterUtils;
+import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
+import org.apache.paimon.utils.TimeUtils;
import org.apache.flink.table.procedure.ProcedureContext;
@@ -96,6 +98,27 @@ public class CompactProcedure extends ProcedureBase {
String tableOptions,
String whereSql)
throws Exception {
+ return call(
+ procedureContext,
+ tableId,
+ partitions,
+ orderStrategy,
+ orderByColumns,
+ tableOptions,
+ whereSql,
+ "");
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String tableId,
+ String partitions,
+ String orderStrategy,
+ String orderByColumns,
+ String tableOptions,
+ String whereSql,
+ String partitionIdleTime)
+ throws Exception {
String warehouse = catalog.warehouse();
Map<String, String> catalogOptions = catalog.options();
@@ -114,8 +137,14 @@ public class CompactProcedure extends ProcedureBase {
identifier.getObjectName(),
catalogOptions,
tableConf);
+ if (!(StringUtils.isBlank(partitionIdleTime))) {
+
action.withPartitionIdleTime(TimeUtils.parseDuration(partitionIdleTime));
+ }
jobName = "Compact Job";
} else if (!orderStrategy.isEmpty() && !orderByColumns.isEmpty()) {
+ Preconditions.checkArgument(
+ StringUtils.isBlank(partitionIdleTime),
+ "sort compact do not support 'partition_idle_time'.");
action =
new SortCompactAction(
warehouse,
diff --git
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
index 1db04aa1d..cd9a8b184 100644
---
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
+++
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
@@ -58,6 +58,9 @@ public class ProcedurePositionalArgumentsITCase extends
CatalogITCaseBase {
sql(
"CALL sys.compact('default.T', '', '',
'', 'sink.parallelism=1','pt=1')"))
.doesNotThrowAnyException();
+ assertThatCode(() -> sql("CALL sys.compact('default.T', '', 'zorder',
'k', '','','5s')"))
+ .message()
+ .contains("sort compact do not support
'partition_idle_time'.");
}
@Test
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 59ca818c3..3cb3513dd 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -38,6 +38,9 @@ import org.apache.flink.table.data.RowData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
+import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -54,6 +57,8 @@ public class CompactAction extends TableActionBase {
private String whereSql;
+ @Nullable private Duration partitionIdleTime = null;
+
public CompactAction(String warehouse, String database, String tableName) {
this(warehouse, database, tableName, Collections.emptyMap(),
Collections.emptyMap());
}
@@ -90,6 +95,11 @@ public class CompactAction extends TableActionBase {
return this;
}
+ public CompactAction withPartitionIdleTime(@Nullable Duration
partitionIdleTime) {
+ this.partitionIdleTime = partitionIdleTime;
+ return this;
+ }
+
@Override
public void build() throws Exception {
ReadableConfig conf = env.getConfiguration();
@@ -120,7 +130,11 @@ public class CompactAction extends TableActionBase {
sourceBuilder.withPartitionPredicate(getPredicate());
DataStreamSource<RowData> source =
-
sourceBuilder.withEnv(env).withContinuousMode(isStreaming).build();
+ sourceBuilder
+ .withEnv(env)
+ .withContinuousMode(isStreaming)
+ .withPartitionIdleTime(partitionIdleTime)
+ .build();
sinkBuilder.withInput(source).build();
}
@@ -132,6 +146,7 @@ public class CompactAction extends TableActionBase {
unawareBucketCompactionTopoBuilder.withPartitionPredicate(getPredicate());
unawareBucketCompactionTopoBuilder.withContinuousMode(isStreaming);
+
unawareBucketCompactionTopoBuilder.withPartitionIdleTime(partitionIdleTime);
unawareBucketCompactionTopoBuilder.build();
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
index c65de5c7c..f43c7a747 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
@@ -18,6 +18,9 @@
package org.apache.paimon.flink.action;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.TimeUtils;
+
import org.apache.flink.api.java.tuple.Tuple3;
import java.util.List;
@@ -34,6 +37,8 @@ public class CompactActionFactory implements ActionFactory {
private static final String WHERE = "where";
+ private static final String PARTITION_IDLE_TIME = "partition_idle_time";
+
@Override
public String identifier() {
return IDENTIFIER;
@@ -47,6 +52,9 @@ public class CompactActionFactory implements ActionFactory {
CompactAction action;
if (params.has(ORDER_STRATEGY)) {
+ Preconditions.checkArgument(
+ !params.has(PARTITION_IDLE_TIME),
+ "sort compact do not support 'partition_idle_time'.");
action =
new SortCompactAction(
tablePath.f0,
@@ -64,6 +72,10 @@ public class CompactActionFactory implements ActionFactory {
tablePath.f2,
catalogConfig,
optionalConfigMap(params, TABLE_CONF));
+ if (params.has(PARTITION_IDLE_TIME)) {
+ action.withPartitionIdleTime(
+
TimeUtils.parseDuration(params.get(PARTITION_IDLE_TIME)));
+ }
}
if (params.has(PARTITION)) {
@@ -88,7 +100,8 @@ public class CompactActionFactory implements ActionFactory {
+ "--table <table_name> [--partition <partition_name>]"
+ "[--order_strategy <order_strategy>]"
+ "[--table_conf <key>=<value>]"
- + "[--order_by <order_columns>]");
+ + "[--order_by <order_columns>]"
+ + "[--partition_idle_time <partition_idle_time>]");
System.out.println(
" compact --warehouse s3://path/to/warehouse --database
<database_name> "
+ "--table <table_name> [--catalog_conf
<paimon_catalog_conf> [--catalog_conf <paimon_catalog_conf> ...]]");
@@ -103,6 +116,9 @@ public class CompactActionFactory implements ActionFactory {
System.out.println(
" order compact now only support append-only table with
bucket=-1, please don't specify --order_strategy parameter if your table does
not meet the request");
System.out.println(" order_strategy now only support zorder in batch
mode");
+ System.out.println(
+ "partition_idle_time now can not be used with order_strategy,
please don't specify --partition_idle_time when use order compact");
+ System.out.println(" partition_idle_time now is only supported in
batch mode");
System.out.println();
System.out.println("Examples:");
@@ -113,6 +129,9 @@ public class CompactActionFactory implements ActionFactory {
System.out.println(
" compact --warehouse hdfs:///path/to/warehouse --database
test_db --table test_table "
+ "--partition dt=20221126,hh=08 --partition
dt=20221127,hh=09");
+ System.out.println(
+ " compact --warehouse hdfs:///path/to/warehouse --database
test_db --table test_table "
+ + "--partition_idle_time 10s");
System.out.println(
" compact --warehouse s3:///path/to/warehouse "
+ "--database test_db "
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
index 5de14768a..7c5854e2d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
@@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -68,6 +69,8 @@ public class CompactDatabaseAction extends ActionBase {
private Options tableOptions = new Options();
+ @Nullable private Duration partitionIdleTime = null;
+
public CompactDatabaseAction(String warehouse, Map<String, String>
catalogConfig) {
super(warehouse, catalogConfig);
}
@@ -101,6 +104,11 @@ public class CompactDatabaseAction extends ActionBase {
return this;
}
+ public CompactDatabaseAction withPartitionIdleTime(@Nullable Duration
partitionIdleTime) {
+ this.partitionIdleTime = partitionIdleTime;
+ return this;
+ }
+
private boolean shouldCompactionTable(String paimonFullTableName) {
boolean shouldCompaction =
includingPattern.matcher(paimonFullTableName).matches();
if (excludingPattern != null) {
@@ -191,11 +199,14 @@ public class CompactDatabaseAction extends ActionBase {
conf.get(ExecutionOptions.RUNTIME_MODE) ==
RuntimeExecutionMode.STREAMING;
CombinedTableCompactorSourceBuilder sourceBuilder =
new CombinedTableCompactorSourceBuilder(
- catalogLoader(),
- databasePattern,
- includingPattern,
- excludingPattern,
-
tableOptions.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis());
+ catalogLoader(),
+ databasePattern,
+ includingPattern,
+ excludingPattern,
+ tableOptions
+
.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL)
+ .toMillis())
+ .withPartitionIdleTime(partitionIdleTime);
// multi bucket table which has multi bucket in a partition like fix
bucket and dynamic
// bucket
@@ -225,7 +236,9 @@ public class CompactDatabaseAction extends ActionBase {
FileStoreTable table,
boolean isStreaming) {
- CompactorSourceBuilder sourceBuilder = new
CompactorSourceBuilder(fullName, table);
+ CompactorSourceBuilder sourceBuilder =
+ new CompactorSourceBuilder(fullName, table)
+ .withPartitionIdleTime(partitionIdleTime);
CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table);
DataStreamSource<RowData> source =
@@ -242,6 +255,7 @@ public class CompactDatabaseAction extends ActionBase {
new UnawareBucketCompactionTopoBuilder(env, fullName, table);
unawareBucketCompactionTopoBuilder.withContinuousMode(isStreaming);
+
unawareBucketCompactionTopoBuilder.withPartitionIdleTime(partitionIdleTime);
unawareBucketCompactionTopoBuilder.build();
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java
index a7b761044..b26870907 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java
@@ -18,6 +18,8 @@
package org.apache.paimon.flink.action;
+import org.apache.paimon.utils.TimeUtils;
+
import java.util.Optional;
/** Factory to create {@link CompactDatabaseAction}. */
@@ -29,6 +31,7 @@ public class CompactDatabaseActionFactory implements
ActionFactory {
private static final String INCLUDING_TABLES = "including_tables";
private static final String EXCLUDING_TABLES = "excluding_tables";
private static final String MODE = "mode";
+ private static final String PARTITION_IDLE_TIME = "partition_idle_time";
@Override
public String identifier() {
@@ -47,6 +50,10 @@ public class CompactDatabaseActionFactory implements
ActionFactory {
.excludingTables(params.get(EXCLUDING_TABLES))
.withDatabaseCompactMode(params.get(MODE))
.withTableOptions(optionalConfigMap(params, TABLE_CONF));
+ String partitionIdleTime = params.get(PARTITION_IDLE_TIME);
+ if (partitionIdleTime != null) {
+
action.withPartitionIdleTime(TimeUtils.parseDuration(partitionIdleTime));
+ }
return Optional.of(action);
}
@@ -59,14 +66,20 @@ public class CompactDatabaseActionFactory implements
ActionFactory {
System.out.println("Syntax:");
System.out.println(
- " compact_database --warehouse <warehouse_path> --database
<database_name> "
+ " compact_database --warehouse <warehouse_path>
--including_databases <database-name|name-regular-expr> "
+ "[--including_tables
<paimon_table_name|name_regular_expr>] "
- + "[--excluding_tables
<paimon_table_name|name_regular_expr>] ");
+ + "[--excluding_tables
<paimon_table_name|name_regular_expr>] "
+ + "[--mode <compact_mode>]"
+ + "[--partition_idle_time <partition_idle_time>]");
System.out.println(
- " compact_database --warehouse s3://path/to/warehouse
--database <database_name> "
+ " compact_database --warehouse s3://path/to/warehouse
--including_databases <database-name|name-regular-expr> "
+ "[--catalog_conf <paimon_catalog_conf>
[--catalog_conf <paimon_catalog_conf> ...]]");
System.out.println();
+ System.out.println(
+ "--including_databases is used to specify which databases are
to be compacted. "
+ + "You must use '|' to separate multiple databases,
Regular expression is supported.");
+
System.out.println(
"--including_tables is used to specify which source tables are
to be compacted. "
+ "You must use '|' to separate multiple tables, the
format is `databaseName.tableName`, Regular expression is supported.");
@@ -75,14 +88,19 @@ public class CompactDatabaseActionFactory implements
ActionFactory {
+ "The usage is same as --including_tables.");
System.out.println(
"--excluding_tables has higher priority than
--including_tables if you specified both.");
+ System.out.println(
+ "--mode is used to specify compaction mode. Possible values:
divided, combined.");
+ System.out.println(
+ "--partition_idle_time is used to do a full compaction for
partition which had not receive any new data for 'partition_idle_time' time.
And only these partitions will be compacted.");
+ System.out.println("--partition_idle_time is only supported in batch
mode. ");
System.out.println();
System.out.println("Examples:");
System.out.println(
- " compact_database --warehouse hdfs:///path/to/warehouse
--database test_db");
+ " compact_database --warehouse hdfs:///path/to/warehouse
--including_databases test_db");
System.out.println(
" compact_database --warehouse s3:///path/to/warehouse "
- + "--database test_db "
+ + "--including_databases test_db "
+ "--catalog_conf s3.endpoint=https://****.com "
+ "--catalog_conf s3.access-key=***** "
+ "--catalog_conf s3.secret-key=***** ");
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
index 3efb58a7f..8c6ed4c9f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
@@ -19,21 +19,32 @@
package org.apache.paimon.flink.compact;
import org.apache.paimon.append.UnawareAppendCompactionTask;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.UnawareBucketCompactionSink;
import org.apache.paimon.flink.source.BucketUnawareCompactSource;
+import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.Preconditions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import javax.annotation.Nullable;
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
/**
* Build for unaware-bucket table flink compaction job.
*
@@ -52,6 +63,7 @@ public class UnawareBucketCompactionTopoBuilder {
private boolean isContinuous = false;
@Nullable private Predicate partitionPredicate;
+ @Nullable private Duration partitionIdleTime = null;
public UnawareBucketCompactionTopoBuilder(
StreamExecutionEnvironment env, String tableIdentifier,
FileStoreTable table) {
@@ -68,15 +80,47 @@ public class UnawareBucketCompactionTopoBuilder {
this.partitionPredicate = predicate;
}
+ public void withPartitionIdleTime(@Nullable Duration partitionIdleTime) {
+ this.partitionIdleTime = partitionIdleTime;
+ }
+
public void build() {
// build source from UnawareSourceFunction
DataStreamSource<UnawareAppendCompactionTask> source = buildSource();
+ if (isContinuous) {
+ Preconditions.checkArgument(
+ partitionIdleTime == null, "Streaming mode does not
support partitionIdleTime");
+ } else if (partitionIdleTime != null) {
+ Map<BinaryRow, Long> partitionInfo = getPartitionInfo(table);
+ long historyMilli =
+ LocalDateTime.now()
+ .minus(partitionIdleTime)
+ .atZone(ZoneId.systemDefault())
+ .toInstant()
+ .toEpochMilli();
+ SingleOutputStreamOperator<UnawareAppendCompactionTask>
filterStream =
+ source.filter(
+ task -> {
+ BinaryRow partition = task.partition();
+ return partitionInfo.get(partition) <=
historyMilli;
+ });
+ source = new DataStreamSource<>(filterStream);
+ }
// from source, construct the full flink job
sinkFromSource(source);
}
+ private Map<BinaryRow, Long> getPartitionInfo(FileStoreTable table) {
+ List<PartitionEntry> partitions =
table.newSnapshotReader().partitionEntries();
+ return partitions.stream()
+ .collect(
+ Collectors.toMap(
+ PartitionEntry::partition,
PartitionEntry::lastFileCreationTime));
+ }
+
private DataStreamSource<UnawareAppendCompactionTask> buildSource() {
+
long scanInterval =
table.coreOptions().continuousDiscoveryInterval().toMillis();
BucketUnawareCompactSource source =
new BucketUnawareCompactSource(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
index 4fea9b635..755118941 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.procedure;
import org.apache.paimon.flink.action.CompactDatabaseAction;
import org.apache.paimon.utils.StringUtils;
+import org.apache.paimon.utils.TimeUtils;
import org.apache.flink.table.procedure.ProcedureContext;
@@ -98,6 +99,25 @@ public class CompactDatabaseProcedure extends ProcedureBase {
String excludingTables,
String tableOptions)
throws Exception {
+ return call(
+ procedureContext,
+ includingDatabases,
+ mode,
+ includingTables,
+ excludingTables,
+ tableOptions,
+ "");
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String includingDatabases,
+ String mode,
+ String includingTables,
+ String excludingTables,
+ String tableOptions,
+ String partitionIdleTime)
+ throws Exception {
String warehouse = catalog.warehouse();
Map<String, String> catalogOptions = catalog.options();
CompactDatabaseAction action =
@@ -109,6 +129,9 @@ public class CompactDatabaseProcedure extends ProcedureBase
{
if (!StringUtils.isBlank(tableOptions)) {
action.withTableOptions(parseCommaSeparatedKeyValues(tableOptions));
}
+ if (!StringUtils.isBlank(partitionIdleTime)) {
+
action.withPartitionIdleTime(TimeUtils.parseDuration(partitionIdleTime));
+ }
return execute(procedureContext, action, "Compact database job");
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java
index 2159cbe9e..384cdcdce 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java
@@ -34,6 +34,9 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import javax.annotation.Nullable;
+
+import java.time.Duration;
import java.util.regex.Pattern;
/**
@@ -49,6 +52,7 @@ public class CombinedTableCompactorSourceBuilder {
private boolean isContinuous = false;
private StreamExecutionEnvironment env;
+ @Nullable private Duration partitionIdleTime = null;
public CombinedTableCompactorSourceBuilder(
Catalog.Loader catalogLoader,
@@ -73,6 +77,12 @@ public class CombinedTableCompactorSourceBuilder {
return this;
}
+ public CombinedTableCompactorSourceBuilder withPartitionIdleTime(
+ @Nullable Duration partitionIdleTime) {
+ this.partitionIdleTime = partitionIdleTime;
+ return this;
+ }
+
public DataStream<RowData> buildAwareBucketTableSource() {
Preconditions.checkArgument(env != null, "StreamExecutionEnvironment
should not be null.");
RowType produceType = BucketsTable.getRowType();
@@ -94,7 +104,8 @@ public class CombinedTableCompactorSourceBuilder {
catalogLoader,
includingPattern,
excludingPattern,
- databasePattern);
+ databasePattern,
+ partitionIdleTime);
}
}
@@ -116,7 +127,8 @@ public class CombinedTableCompactorSourceBuilder {
catalogLoader,
includingPattern,
excludingPattern,
- databasePattern);
+ databasePattern,
+ partitionIdleTime);
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
index ee531a2e2..10fc3b558 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
@@ -19,26 +19,37 @@
package org.apache.paimon.flink.source;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.LogicalTypeConversion;
+import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.system.BucketsTable;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Preconditions;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import javax.annotation.Nullable;
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
/**
* Source builder to build a Flink {@link StaticFileStoreSource} or {@link
@@ -52,6 +63,7 @@ public class CompactorSourceBuilder {
private boolean isContinuous = false;
private StreamExecutionEnvironment env;
@Nullable private Predicate partitionPredicate = null;
+ @Nullable private Duration partitionIdleTime = null;
public CompactorSourceBuilder(String tableIdentifier, FileStoreTable
table) {
this.tableIdentifier = tableIdentifier;
@@ -68,6 +80,11 @@ public class CompactorSourceBuilder {
return this;
}
+ public CompactorSourceBuilder withPartitionIdleTime(@Nullable Duration
partitionIdleTime) {
+ this.partitionIdleTime = partitionIdleTime;
+ return this;
+ }
+
private Source<RowData, ?, ?> buildSource(BucketsTable bucketsTable) {
if (isContinuous) {
@@ -101,6 +118,25 @@ public class CompactorSourceBuilder {
WatermarkStrategy.noWatermarks(),
tableIdentifier + "-compact-source",
InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(produceType)));
+ if (isContinuous) {
+ Preconditions.checkArgument(
+ partitionIdleTime == null, "Streaming mode does not
support partitionIdleTime");
+ } else if (partitionIdleTime != null) {
+ Map<BinaryRow, Long> partitionInfo =
getPartitionInfo(bucketsTable);
+ long historyMilli =
+ LocalDateTime.now()
+ .minus(partitionIdleTime)
+ .atZone(ZoneId.systemDefault())
+ .toInstant()
+ .toEpochMilli();
+ SingleOutputStreamOperator<RowData> filterStream =
+ dataStream.filter(
+ rowData -> {
+ BinaryRow partition =
deserializeBinaryRow(rowData.getBinary(1));
+ return partitionInfo.get(partition) <=
historyMilli;
+ });
+ dataStream = new DataStreamSource<>(filterStream);
+ }
Integer parallelism =
Options.fromMap(table.options()).get(FlinkConnectorOptions.SCAN_PARALLELISM);
if (parallelism != null) {
@@ -138,4 +174,13 @@ public class CompactorSourceBuilder {
this.partitionPredicate = partitionPredicate;
return this;
}
+
+ private Map<BinaryRow, Long> getPartitionInfo(BucketsTable table) {
+ List<PartitionEntry> partitions =
table.newSnapshotReader().partitionEntries();
+
+ return partitions.stream()
+ .collect(
+ Collectors.toMap(
+ PartitionEntry::partition,
PartitionEntry::lastFileCreationTime));
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java
index 22ef33029..cee6081aa 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java
@@ -39,6 +39,7 @@ import org.apache.flink.table.data.RowData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.Duration;
import java.util.regex.Pattern;
import static
org.apache.paimon.flink.compact.MultiTableScanBase.ScanResult.FINISHED;
@@ -98,7 +99,8 @@ public class CombinedAwareBatchSourceFunction
Catalog.Loader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
- Pattern databasePattern) {
+ Pattern databasePattern,
+ Duration partitionIdleTime) {
CombinedAwareBatchSourceFunction function =
new CombinedAwareBatchSourceFunction(
catalogLoader, includingPattern, excludingPattern,
databasePattern);
@@ -112,7 +114,10 @@ public class CombinedAwareBatchSourceFunction
.partitionCustom(
(key, numPartitions) -> key % numPartitions,
split -> ((DataSplit) split.f0).bucket())
- .transform(name, typeInfo, new
MultiTablesReadOperator(catalogLoader, false));
+ .transform(
+ name,
+ typeInfo,
+ new MultiTablesReadOperator(catalogLoader, false,
partitionIdleTime));
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java
index 67579f2d0..b0b5a7784 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java
@@ -20,9 +20,14 @@ package org.apache.paimon.flink.source.operator;
import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.compact.MultiTableScanBase;
import org.apache.paimon.flink.compact.MultiUnawareBucketTableScan;
import org.apache.paimon.flink.sink.MultiTableCompactionTaskTypeInfo;
+import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.configuration.Configuration;
@@ -36,7 +41,13 @@ import
org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import static
org.apache.paimon.flink.compact.MultiTableScanBase.ScanResult.FINISHED;
import static
org.apache.paimon.flink.compact.MultiTableScanBase.ScanResult.IS_EMPTY;
@@ -96,7 +107,8 @@ public class CombinedUnawareBatchSourceFunction
Catalog.Loader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
- Pattern databasePattern) {
+ Pattern databasePattern,
+ @Nullable Duration partitionIdleTime) {
CombinedUnawareBatchSourceFunction function =
new CombinedUnawareBatchSourceFunction(
catalogLoader, includingPattern, excludingPattern,
databasePattern);
@@ -115,6 +127,14 @@ public class CombinedUnawareBatchSourceFunction
Boundedness.BOUNDED)
.forceNonParallel();
+ if (partitionIdleTime != null) {
+ source =
+ source.transform(
+ name,
+ compactionTaskTypeInfo,
+ new MultiUnawareTablesReadOperator(catalogLoader,
partitionIdleTime));
+ }
+
PartitionTransformation<MultiTableUnawareAppendCompactionTask>
transformation =
new PartitionTransformation<>(
source.getTransformation(), new
RebalancePartitioner<>());
@@ -122,6 +142,38 @@ public class CombinedUnawareBatchSourceFunction
return new DataStream<>(env, transformation);
}
+ private static Long getPartitionInfo(
+ Identifier tableIdentifier,
+ BinaryRow partition,
+ Map<Identifier, Map<BinaryRow, Long>> multiTablesPartitionInfo,
+ Catalog catalog) {
+ Map<BinaryRow, Long> partitionInfo =
multiTablesPartitionInfo.get(tableIdentifier);
+ if (partitionInfo == null) {
+ try {
+ Table table = catalog.getTable(tableIdentifier);
+ if (!(table instanceof FileStoreTable)) {
+ LOGGER.error(
+ String.format(
+ "Only FileStoreTable supports compact
action. The table type is '%s'.",
+ table.getClass().getName()));
+ }
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ List<PartitionEntry> partitions =
+ fileStoreTable.newSnapshotReader().partitionEntries();
+ partitionInfo =
+ partitions.stream()
+ .collect(
+ Collectors.toMap(
+ PartitionEntry::partition,
+
PartitionEntry::lastFileCreationTime));
+ multiTablesPartitionInfo.put(tableIdentifier, partitionInfo);
+ } catch (Catalog.TableNotExistException e) {
+ LOGGER.error(String.format("table: %s not found.",
tableIdentifier.getFullName()));
+ }
+ }
+ return partitionInfo.get(partition);
+ }
+
@Override
public void close() throws Exception {
super.close();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
index d43245212..0b48a9bcc 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
@@ -20,9 +20,11 @@ package org.apache.paimon.flink.source.operator;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;
@@ -37,10 +39,16 @@ import
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import static
org.apache.paimon.flink.utils.MultiTablesCompactorUtil.compactOptions;
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
/**
* The operator that reads the Tuple2<{@link Split}, String> received from the
preceding {@link
@@ -56,11 +64,20 @@ public class MultiTablesReadOperator extends
AbstractStreamOperator<RowData>
private final Catalog.Loader catalogLoader;
private final boolean isStreaming;
+ private Duration partitionIdleTime = null;
+
public MultiTablesReadOperator(Catalog.Loader catalogLoader, boolean
isStreaming) {
this.catalogLoader = catalogLoader;
this.isStreaming = isStreaming;
}
+ public MultiTablesReadOperator(
+ Catalog.Loader catalogLoader, boolean isStreaming, Duration
partitionIdleTime) {
+ this.catalogLoader = catalogLoader;
+ this.isStreaming = isStreaming;
+ this.partitionIdleTime = partitionIdleTime;
+ }
+
private transient Catalog catalog;
private transient IOManager ioManager;
private transient Map<Identifier, BucketsTable> tablesMap;
@@ -83,17 +100,33 @@ public class MultiTablesReadOperator extends
AbstractStreamOperator<RowData>
this.reuseRow = new FlinkRowData(null);
this.reuseRecord = new StreamRecord<>(reuseRow);
+
+ if (isStreaming) {
+ Preconditions.checkArgument(
+ partitionIdleTime == null, "Streaming mode does not
support partitionIdleTime");
+ }
}
@Override
public void processElement(StreamRecord<Tuple2<Split, String>> record)
throws Exception {
Identifier identifier = Identifier.fromString(record.getValue().f1);
TableRead read = getTableRead(identifier);
+ Map<BinaryRow, Long> partitionInfo =
getPartitionInfo(tablesMap.get(identifier));
try (CloseableIterator<InternalRow> iterator =
read.createReader(record.getValue().f0).toCloseableIterator())
{
- while (iterator.hasNext()) {
- reuseRow.replace(iterator.next());
- output.collect(reuseRecord);
+ if (partitionIdleTime == null) {
+ while (iterator.hasNext()) {
+ reuseRow.replace(iterator.next());
+ output.collect(reuseRecord);
+ }
+ } else {
+ while (iterator.hasNext()) {
+ InternalRow row = iterator.next();
+ if (checkIsHistoryPartition(row, partitionInfo)) {
+ reuseRow.replace(row);
+ output.collect(reuseRecord);
+ }
+ }
}
}
}
@@ -123,6 +156,26 @@ public class MultiTablesReadOperator extends
AbstractStreamOperator<RowData>
return readsMap.get(tableId);
}
+ private Map<BinaryRow, Long> getPartitionInfo(BucketsTable table) {
+ List<PartitionEntry> partitions =
table.newSnapshotReader().partitionEntries();
+
+ return partitions.stream()
+ .collect(
+ Collectors.toMap(
+ PartitionEntry::partition,
PartitionEntry::lastFileCreationTime));
+ }
+
+ private boolean checkIsHistoryPartition(InternalRow row, Map<BinaryRow,
Long> partitionInfo) {
+ BinaryRow partition = deserializeBinaryRow(row.getBinary(1));
+ long historyMilli =
+ LocalDateTime.now()
+ .minus(partitionIdleTime)
+ .atZone(ZoneId.systemDefault())
+ .toInstant()
+ .toEpochMilli();
+ return partitionInfo.get(partition) <= historyMilli;
+ }
+
@Override
public void close() throws Exception {
super.close();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiUnawareTablesReadOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiUnawareTablesReadOperator.java
new file mode 100644
index 000000000..4dba03cf4
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiUnawareTablesReadOperator.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.operator;
+
+import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * The operator is used for historical partition compaction. It reads {@link
+ * MultiTableUnawareAppendCompactionTask} received from the preceding {@link
+ * CombinedUnawareBatchSourceFunction} and filter partitions which is not
historical.
+ */
+public class MultiUnawareTablesReadOperator
+ extends AbstractStreamOperator<MultiTableUnawareAppendCompactionTask>
+ implements OneInputStreamOperator<
+ MultiTableUnawareAppendCompactionTask,
MultiTableUnawareAppendCompactionTask> {
+ private static final long serialVersionUID = 1L;
+
+ private final Catalog.Loader catalogLoader;
+
+ private final Duration partitionIdleTime;
+
+ public MultiUnawareTablesReadOperator(
+ Catalog.Loader catalogLoader, Duration partitionIdleTime) {
+ this.catalogLoader = catalogLoader;
+ this.partitionIdleTime = partitionIdleTime;
+ }
+
+ private transient Catalog catalog;
+ private transient Map<Identifier, FileStoreTable> tablesMap;
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ tablesMap = new HashMap<>();
+ catalog = catalogLoader.load();
+ }
+
+ @Override
+ public void
processElement(StreamRecord<MultiTableUnawareAppendCompactionTask> record) {
+ Identifier identifier = record.getValue().tableIdentifier();
+ BinaryRow partition = record.getValue().partition();
+ FileStoreTable table = getTable(identifier);
+ Map<BinaryRow, Long> partitionInfo = getPartitionInfo(table);
+ if (checkIsHistoryPartition(partition, partitionInfo)) {
+ output.collect(record);
+ }
+ }
+
+ private FileStoreTable getTable(Identifier tableId) {
+ FileStoreTable table = tablesMap.get(tableId);
+ if (table == null) {
+ try {
+ Table newTable = catalog.getTable(tableId);
+ Preconditions.checkArgument(
+ newTable instanceof FileStoreTable,
+ "Only FileStoreTable supports compact action. The
table type is '%s'.",
+ newTable.getClass().getName());
+ table = (FileStoreTable) newTable;
+ tablesMap.put(tableId, table);
+ } catch (Catalog.TableNotExistException e) {
+ LOG.error(String.format("table: %s not found.",
tableId.getFullName()));
+ }
+ }
+
+ return table;
+ }
+
+ private Map<BinaryRow, Long> getPartitionInfo(FileStoreTable table) {
+ List<PartitionEntry> partitions =
table.newSnapshotReader().partitionEntries();
+ return partitions.stream()
+ .collect(
+ Collectors.toMap(
+ PartitionEntry::partition,
PartitionEntry::lastFileCreationTime));
+ }
+
+ private boolean checkIsHistoryPartition(
+ BinaryRow partition, Map<BinaryRow, Long> partitionInfo) {
+ long historyMilli =
+ LocalDateTime.now()
+ .minus(partitionIdleTime)
+ .atZone(ZoneId.systemDefault())
+ .toInstant()
+ .toEpochMilli();
+ return partitionInfo.get(partition) <= historyMilli;
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (catalog != null) {
+ catalog.close();
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/MultiTablesCompactorUtil.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/MultiTablesCompactorUtil.java
index fc648b468..27264eb80 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/MultiTablesCompactorUtil.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/MultiTablesCompactorUtil.java
@@ -62,6 +62,20 @@ public class MultiTablesCompactorUtil {
}
}
+ public static Map<String, String> partitionCompactOptions() {
+
+ return new HashMap<String, String>() {
+ {
+ put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), null);
+ put(CoreOptions.SCAN_TIMESTAMP.key(), null);
+ put(CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS.key(), null);
+ put(CoreOptions.SCAN_SNAPSHOT_ID.key(), null);
+ put(CoreOptions.SCAN_MODE.key(),
CoreOptions.StartupMode.LATEST_FULL.toString());
+ put(CoreOptions.WRITE_ONLY.key(), "false");
+ }
+ };
+ }
+
public static boolean shouldCompactTable(
Identifier tableIdentifier, Pattern includingPattern, Pattern
excludingPattern) {
String paimonFullTableName = tableIdentifier.getFullName();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index 9094239c9..ba95113d9 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -39,6 +39,8 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.time.Duration;
import java.util.ArrayList;
@@ -169,6 +171,79 @@ public class CompactActionITCase extends
CompactActionITCaseBase {
String.format("Cannot validate snapshot expiration in %s
milliseconds.", 60_000));
}
+ @ParameterizedTest(name = "mode = {0}")
+ @ValueSource(booleans = {true, false})
+ @Timeout(60)
+ public void testHistoryPartitionCompact(boolean mode) throws Exception {
+ String partitionIdleTime = "5s";
+ FileStoreTable table;
+ if (mode) {
+ table =
+ prepareTable(
+ Arrays.asList("dt", "hh"),
+ Arrays.asList("dt", "hh", "k"),
+ Collections.emptyList(),
+
Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true"));
+ } else {
+ // for unaware bucket table
+ Map<String, String> tableOptions = new HashMap<>();
+ tableOptions.put(CoreOptions.BUCKET.key(), "-1");
+ tableOptions.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
+ tableOptions.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2");
+
+ table =
+ prepareTable(
+ Arrays.asList("dt", "hh"),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ tableOptions);
+ }
+
+ writeData(
+ rowData(1, 100, 15, BinaryString.fromString("20221208")),
+ rowData(1, 100, 16, BinaryString.fromString("20221208")),
+ rowData(1, 100, 15, BinaryString.fromString("20221209")));
+
+ writeData(
+ rowData(2, 100, 15, BinaryString.fromString("20221208")),
+ rowData(2, 100, 16, BinaryString.fromString("20221208")),
+ rowData(2, 100, 15, BinaryString.fromString("20221209")));
+
+ Thread.sleep(5000);
+ writeData(rowData(3, 100, 16, BinaryString.fromString("20221208")));
+ checkLatestSnapshot(table, 3, Snapshot.CommitKind.APPEND);
+
+ CompactAction action =
+ createAction(
+ CompactAction.class,
+ "compact",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ tableName,
+ "--partition_idle_time",
+ partitionIdleTime);
+ StreamExecutionEnvironment env =
streamExecutionEnvironmentBuilder().batchMode().build();
+ action.withStreamExecutionEnvironment(env).build();
+ env.execute();
+
+ checkLatestSnapshot(table, 4, Snapshot.CommitKind.COMPACT);
+
+ List<DataSplit> splits = table.newSnapshotReader().read().dataSplits();
+ assertThat(splits.size()).isEqualTo(3);
+ for (DataSplit split : splits) {
+ if (split.partition().getInt(1) == 15) {
+ // compacted
+ assertThat(split.dataFiles().size()).isEqualTo(1);
+ } else {
+ // not compacted
+ assertThat(split.dataFiles().size()).isEqualTo(3);
+ }
+ }
+ }
+
@Test
public void testUnawareBucketStreamingCompact() throws Exception {
Map<String, String> tableOptions = new HashMap<>();
@@ -294,6 +369,39 @@ public class CompactActionITCase extends
CompactActionITCaseBase {
.hasMessage("Only parition key can be specialized in
compaction action.");
}
+ @Test
+ public void testWrongUsage() throws Exception {
+ Map<String, String> tableOptions = new HashMap<>();
+ tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
+ tableOptions.put(CoreOptions.BUCKET.key(), "-1");
+
+ prepareTable(
+ Collections.singletonList("v"),
+ Arrays.asList(),
+ Collections.emptyList(),
+ tableOptions);
+
+ // partition_idle_time can not be used with order-strategy
+ Assertions.assertThatThrownBy(
+ () ->
+ createAction(
+ CompactAction.class,
+ "compact",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ tableName,
+ "--partition_idle_time",
+ "5s",
+ "--order_strategy",
+ "zorder",
+ "--order_by",
+ "dt,hh"))
+ .hasMessage("sort compact do not support
'partition_idle_time'.");
+ }
+
private FileStoreTable prepareTable(
List<String> partitionKeys,
List<String> primaryKeys,
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
index 80b8e3798..c5008b216 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
@@ -500,6 +500,107 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
}
}
+ @ParameterizedTest(name = "mode = {0}")
+ @ValueSource(strings = {"divided", "combined"})
+ @Timeout(60)
+ public void testHistoryPartitionCompact(String mode) throws Exception {
+ List<FileStoreTable> tables = new ArrayList<>();
+ String partitionIdleTime = "10s";
+
+ for (String dbName : DATABASE_NAMES) {
+ for (String tableName : TABLE_NAMES) {
+ Map<String, String> option = new HashMap<>();
+ option.put(CoreOptions.WRITE_ONLY.key(), "true");
+ List<String> keys;
+ if (tableName.endsWith("unaware_bucket")) {
+ option.put("bucket", "-1");
+ option.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
+ option.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2");
+ keys = Lists.newArrayList();
+ } else {
+ option.put("bucket", "1");
+ keys = Arrays.asList("dt", "hh", "k");
+ }
+ FileStoreTable table =
+ createTable(dbName, tableName, Arrays.asList("dt",
"hh"), keys, option);
+ tables.add(table);
+ SnapshotManager snapshotManager = table.snapshotManager();
+ StreamWriteBuilder streamWriteBuilder =
+
table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = streamWriteBuilder.newWrite();
+ commit = streamWriteBuilder.newCommit();
+
+ writeData(
+ rowData(1, 100, 15,
BinaryString.fromString("20221208")),
+ rowData(1, 100, 16,
BinaryString.fromString("20221208")),
+ rowData(1, 100, 15,
BinaryString.fromString("20221209")));
+
+ writeData(
+ rowData(2, 100, 15,
BinaryString.fromString("20221208")),
+ rowData(2, 100, 16,
BinaryString.fromString("20221208")),
+ rowData(2, 100, 15,
BinaryString.fromString("20221209")));
+
+ Snapshot snapshot =
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+ assertThat(snapshot.id()).isEqualTo(2);
+
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+ write.close();
+ commit.close();
+ }
+ }
+
+ // sleep 3s, update partition 20221208-16
+ Thread.sleep(10000);
+ for (FileStoreTable table : tables) {
+ StreamWriteBuilder streamWriteBuilder =
+ table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = streamWriteBuilder.newWrite();
+ commit = streamWriteBuilder.newCommit();
+ writeData(rowData(3, 100, 16,
BinaryString.fromString("20221208")));
+ }
+
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ StreamExecutionEnvironment env =
+ streamExecutionEnvironmentBuilder().batchMode().build();
+ createAction(
+ CompactDatabaseAction.class,
+ "compact_database",
+ "--warehouse",
+ warehouse,
+ "--mode",
+ mode,
+ "--partition_idle_time",
+ partitionIdleTime)
+ .withStreamExecutionEnvironment(env)
+ .build();
+ env.execute();
+ } else {
+ callProcedure(
+ String.format(
+ "CALL sys.compact_database('',
'%s','','','','%s')",
+ mode, partitionIdleTime),
+ false,
+ true);
+ }
+
+ for (FileStoreTable table : tables) {
+ SnapshotManager snapshotManager = table.snapshotManager();
+ Snapshot snapshot =
+
table.snapshotManager().snapshot(snapshotManager.latestSnapshotId());
+ assertThat(snapshot.id()).isEqualTo(4);
+
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
+
+ List<DataSplit> splits =
table.newSnapshotReader().read().dataSplits();
+ assertThat(splits.size()).isEqualTo(3);
+ for (DataSplit split : splits) {
+ if (split.partition().getInt(1) == 16) {
+ assertThat(split.dataFiles().size()).isEqualTo(3);
+ } else {
+ assertThat(split.dataFiles().size()).isEqualTo(1);
+ }
+ }
+ }
+ }
+
@ParameterizedTest(name = "mode = {0}")
@ValueSource(strings = {"divided", "combined"})
@Timeout(60)
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
index f7db0dfcf..9afd04c8d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
@@ -51,6 +51,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -287,6 +288,54 @@ public class CompactorSourceITCase extends
AbstractTestBase {
it.close();
}
+ @ParameterizedTest(name = "defaultOptions = {0}")
+ @ValueSource(booleans = {true, false})
+ public void testHistoryPartitionRead(boolean defaultOptions) throws
Exception {
+ Duration partitionIdleTime = Duration.ofMillis(3000);
+ FileStoreTable table = createFileStoreTable();
+ if (!defaultOptions) {
+ // change options to test whether CompactorSourceBuilder work
normally
+ table =
table.copy(Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(), "2"));
+ }
+ StreamWriteBuilder streamWriteBuilder =
+ table.newStreamWriteBuilder().withCommitUser(commitUser);
+ StreamTableWrite write = streamWriteBuilder.newWrite();
+ StreamTableCommit commit = streamWriteBuilder.newCommit();
+
+ write.write(rowData(1, 1510, BinaryString.fromString("20221208"), 15));
+ write.write(rowData(1, 1620, BinaryString.fromString("20221208"), 16));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(rowData(2, 1511, BinaryString.fromString("20221208"), 15));
+ write.write(rowData(2, 1510, BinaryString.fromString("20221209"), 15));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ Thread.sleep(3000);
+ write.write(rowData(3, 1510, BinaryString.fromString("20221208"), 16));
+ commit.commit(2, write.prepareCommit(true, 2));
+
+ StreamExecutionEnvironment env =
+ streamExecutionEnvironmentBuilder().streamingMode().build();
+ DataStreamSource<RowData> compactorSource =
+ new CompactorSourceBuilder("test", table)
+ .withContinuousMode(false)
+ .withPartitionIdleTime(partitionIdleTime)
+ .withEnv(env)
+ .build();
+ CloseableIterator<RowData> it = compactorSource.executeAndCollect();
+
+ List<String> actual = new ArrayList<>();
+ while (it.hasNext()) {
+ actual.add(toString(it.next()));
+ }
+ assertThat(actual)
+ .hasSameElementsAs(Arrays.asList("+I 3|20221208|15|0|0", "+I
3|20221209|15|0|0"));
+
+ write.close();
+ commit.close();
+ it.close();
+ }
+
private String toString(RowData rowData) {
int numFiles;
try {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java
index d1acde411..fba5f3380 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java
@@ -53,6 +53,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -491,6 +492,109 @@ public class MultiTablesCompactorSourceBuilderITCase
extends AbstractTestBase
it.close();
}
+ @ParameterizedTest(name = "defaultOptions = {0}")
+ @ValueSource(booleans = {true, false})
+ public void testHistoryPatitionRead(boolean defaultOptions) throws
Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.WRITE_ONLY.key(), "true");
+ if (!defaultOptions) {
+ // change options to test whether CompactorSourceBuilder work
normally
+ options.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), "2");
+ }
+ options.put("bucket", "1");
+ long monitorInterval = 1000;
+ Duration partitionIdleTime = Duration.ofMillis(3000);
+ List<FileStoreTable> tables = new ArrayList<>();
+
+ for (String dbName : DATABASE_NAMES) {
+ for (String tableName : TABLE_NAMES) {
+ FileStoreTable table =
+ createTable(
+ dbName,
+ tableName,
+ ROW_TYPE_MAP.get(tableName),
+ Arrays.asList("dt", "hh"),
+ Arrays.asList("dt", "hh", "k"),
+ options);
+ tables.add(table);
+ SnapshotManager snapshotManager = table.snapshotManager();
+ StreamWriteBuilder streamWriteBuilder =
+
table.newStreamWriteBuilder().withCommitUser(commitUser);
+ StreamTableWrite write = streamWriteBuilder.newWrite();
+ StreamTableCommit commit = streamWriteBuilder.newCommit();
+
+ writeData(
+ write,
+ commit,
+ 0,
+ rowData(1, 100, 15,
BinaryString.fromString("20221208")),
+ rowData(1, 100, 16,
BinaryString.fromString("20221208")),
+ rowData(1, 100, 15,
BinaryString.fromString("20221209")));
+
+ writeData(
+ write,
+ commit,
+ 1,
+ rowData(2, 100, 15,
BinaryString.fromString("20221208")),
+ rowData(2, 100, 16,
BinaryString.fromString("20221208")),
+ rowData(2, 100, 15,
BinaryString.fromString("20221209")));
+
+ Snapshot snapshot =
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+ assertThat(snapshot.id()).isEqualTo(2);
+
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+
+ write.close();
+ commit.close();
+ }
+ }
+
+ // sleep 3 seconds, and update partition 20221208-16
+ Thread.sleep(3000);
+
+ for (FileStoreTable table : tables) {
+ StreamWriteBuilder streamWriteBuilder =
+ table.newStreamWriteBuilder().withCommitUser(commitUser);
+ StreamTableWrite write = streamWriteBuilder.newWrite();
+ StreamTableCommit commit = streamWriteBuilder.newCommit();
+ writeData(write, commit, 2, rowData(3, 100, 16,
BinaryString.fromString("20221208")));
+ }
+
+ StreamExecutionEnvironment env =
+ streamExecutionEnvironmentBuilder()
+ .batchMode()
+ .parallelism(ThreadLocalRandom.current().nextInt(2) +
1)
+ .build();
+
+ DataStream<RowData> source =
+ new CombinedTableCompactorSourceBuilder(
+ catalogLoader(),
+ Pattern.compile("db1|db2"),
+ Pattern.compile(".*"),
+ null,
+ monitorInterval)
+ .withPartitionIdleTime(partitionIdleTime)
+ .withContinuousMode(false)
+ .withEnv(env)
+ .buildAwareBucketTableSource();
+ CloseableIterator<RowData> it = source.executeAndCollect();
+ List<String> actual = new ArrayList<>();
+ while (it.hasNext()) {
+ actual.add(toString(it.next()));
+ }
+ assertThat(actual)
+ .hasSameElementsAs(
+ Arrays.asList(
+ "+I 3|20221208|15|0|0|db1|t1",
+ "+I 3|20221209|15|0|0|db1|t1",
+ "+I 3|20221208|15|0|0|db1|t2",
+ "+I 3|20221209|15|0|0|db1|t2",
+ "+I 3|20221208|15|0|0|db2|t1",
+ "+I 3|20221209|15|0|0|db2|t1",
+ "+I 3|20221208|15|0|0|db2|t2",
+ "+I 3|20221209|15|0|0|db2|t2"));
+ it.close();
+ }
+
private FileStoreTable createTable(
String databaseName,
String tableName,
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 1c1d73860..b541249bd 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -24,6 +24,7 @@ import org.apache.paimon.append.UnawareAppendCompactionTask;
import org.apache.paimon.append.UnawareAppendTableCompactionCoordinator;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.spark.PaimonSplitScan;
@@ -48,6 +49,7 @@ import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.ParameterUtils;
import org.apache.paimon.utils.SerializationUtils;
import org.apache.paimon.utils.StringUtils;
+import org.apache.paimon.utils.TimeUtils;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -71,6 +73,9 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -80,6 +85,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -113,6 +119,7 @@ public class CompactProcedure extends BaseProcedure {
ProcedureParameter.optional("where", StringType),
ProcedureParameter.optional("max_concurrent_jobs",
IntegerType),
ProcedureParameter.optional("options", StringType),
+ ProcedureParameter.optional("partition_idle_time", StringType),
};
private static final StructType OUTPUT_TYPE =
@@ -147,10 +154,16 @@ public class CompactProcedure extends BaseProcedure {
String where = blank(args, 4) ? null : args.getString(4);
int maxConcurrentJobs = args.isNullAt(5) ? 15 : args.getInt(5);
String options = args.isNullAt(6) ? null : args.getString(6);
+ Duration partitionIdleTime =
+ blank(args, 7) ? null :
TimeUtils.parseDuration(args.getString(7));
if (TableSorter.OrderType.NONE.name().equals(sortType) &&
!sortColumns.isEmpty()) {
throw new IllegalArgumentException(
"order_strategy \"none\" cannot work with order_by
columns.");
}
+ if (partitionIdleTime != null &&
(!TableSorter.OrderType.NONE.name().equals(sortType))) {
+ throw new IllegalArgumentException(
+ "sort compact do not support 'partition_idle_time'.");
+ }
checkArgument(
partitions == null || where == null,
"partitions and where cannot be used together.");
@@ -193,7 +206,8 @@ public class CompactProcedure extends BaseProcedure {
sortColumns,
relation,
condition,
- maxConcurrentJobs));
+ maxConcurrentJobs,
+ partitionIdleTime));
return new InternalRow[] {internalRow};
});
}
@@ -213,7 +227,8 @@ public class CompactProcedure extends BaseProcedure {
List<String> sortColumns,
DataSourceV2Relation relation,
@Nullable Expression condition,
- int maxConcurrentJobs) {
+ int maxConcurrentJobs,
+ @Nullable Duration partitionIdleTime) {
BucketMode bucketMode = table.bucketMode();
TableSorter.OrderType orderType = TableSorter.OrderType.of(sortType);
Predicate filter =
@@ -230,10 +245,10 @@ public class CompactProcedure extends BaseProcedure {
switch (bucketMode) {
case HASH_FIXED:
case HASH_DYNAMIC:
- compactAwareBucketTable(table, filter, javaSparkContext);
+ compactAwareBucketTable(table, filter, partitionIdleTime,
javaSparkContext);
break;
case BUCKET_UNAWARE:
- compactUnAwareBucketTable(table, filter, javaSparkContext);
+ compactUnAwareBucketTable(table, filter,
partitionIdleTime, javaSparkContext);
break;
default:
throw new UnsupportedOperationException(
@@ -256,17 +271,22 @@ public class CompactProcedure extends BaseProcedure {
}
private void compactAwareBucketTable(
- FileStoreTable table, @Nullable Predicate filter, JavaSparkContext
javaSparkContext) {
+ FileStoreTable table,
+ @Nullable Predicate filter,
+ @Nullable Duration partitionIdleTime,
+ JavaSparkContext javaSparkContext) {
SnapshotReader snapshotReader = table.newSnapshotReader();
if (filter != null) {
snapshotReader.withFilter(filter);
}
-
+ Set<BinaryRow> partitionToBeCompacted =
+ getHistoryPartition(snapshotReader, partitionIdleTime);
List<Pair<byte[], Integer>> partitionBuckets =
snapshotReader.read().splits().stream()
.map(split -> (DataSplit) split)
.map(dataSplit -> Pair.of(dataSplit.partition(),
dataSplit.bucket()))
.distinct()
+ .filter(pair ->
partitionToBeCompacted.contains(pair.getKey()))
.map(
p ->
Pair.of(
@@ -329,9 +349,30 @@ public class CompactProcedure extends BaseProcedure {
}
private void compactUnAwareBucketTable(
- FileStoreTable table, @Nullable Predicate filter, JavaSparkContext
javaSparkContext) {
+ FileStoreTable table,
+ @Nullable Predicate filter,
+ @Nullable Duration partitionIdleTime,
+ JavaSparkContext javaSparkContext) {
List<UnawareAppendCompactionTask> compactionTasks =
new UnawareAppendTableCompactionCoordinator(table, false,
filter).run();
+ if (partitionIdleTime != null) {
+ Map<BinaryRow, Long> partitionInfo =
+ table.newSnapshotReader().partitionEntries().stream()
+ .collect(
+ Collectors.toMap(
+ PartitionEntry::partition,
+
PartitionEntry::lastFileCreationTime));
+ long historyMilli =
+ LocalDateTime.now()
+ .minus(partitionIdleTime)
+ .atZone(ZoneId.systemDefault())
+ .toInstant()
+ .toEpochMilli();
+ compactionTasks =
+ compactionTasks.stream()
+ .filter(task ->
partitionInfo.get(task.partition()) <= historyMilli)
+ .collect(Collectors.toList());
+ }
if (compactionTasks.isEmpty()) {
return;
}
@@ -392,6 +433,31 @@ public class CompactProcedure extends BaseProcedure {
}
}
+ private Set<BinaryRow> getHistoryPartition(
+ SnapshotReader snapshotReader, @Nullable Duration
partitionIdleTime) {
+ Set<Pair<BinaryRow, Long>> partitionInfo =
+ snapshotReader.partitionEntries().stream()
+ .map(
+ partitionEntry ->
+ Pair.of(
+ partitionEntry.partition(),
+
partitionEntry.lastFileCreationTime()))
+ .collect(Collectors.toSet());
+ if (partitionIdleTime != null) {
+ long historyMilli =
+ LocalDateTime.now()
+ .minus(partitionIdleTime)
+ .atZone(ZoneId.systemDefault())
+ .toInstant()
+ .toEpochMilli();
+ partitionInfo =
+ partitionInfo.stream()
+ .filter(partition -> partition.getValue() <=
historyMilli)
+ .collect(Collectors.toSet());
+ }
+ return
partitionInfo.stream().map(Pair::getKey).collect(Collectors.toSet());
+ }
+
private void sortCompactUnAwareBucketTable(
FileStoreTable table,
TableSorter.OrderType orderType,
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
index 73d6fc442..c522c1c9b 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
@@ -22,6 +22,7 @@ import org.apache.paimon.Snapshot.CommitKind
import org.apache.paimon.fs.Path
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.table.source.DataSplit
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.execution.streaming.MemoryStream
@@ -480,6 +481,11 @@ abstract class CompactProcedureTestBase extends
PaimonSparkTestBase with StreamT
assert(intercept[IllegalArgumentException] {
spark.sql("CALL sys.compact(table => 'T', order_strategy => 'sort',
order_by => 'pt')")
}.getMessage.contains("order_by should not contain partition cols"))
+
+ assert(intercept[IllegalArgumentException] {
+ spark.sql(
+ "CALL sys.compact(table => 'T', order_strategy => 'sort', order_by =>
'id', partition_idle_time =>'5s')")
+ }.getMessage.contains("sort compact do not support 'partition_idle_time'"))
}
test("Paimon Procedure: compact with where") {
@@ -561,6 +567,87 @@ abstract class CompactProcedureTestBase extends
PaimonSparkTestBase with StreamT
Row(5, "e", "p1") :: Row(6, "f", "p2") :: Nil)
}
+ test("Paimon Procedure: compact with partition_idle_time for pk table") {
+ Seq(1, -1).foreach(
+ bucket => {
+ withTable("T") {
+ val dynamicBucketArgs = if (bucket == -1) "
,'dynamic-bucket.initial-buckets'='1'" else ""
+ spark.sql(
+ s"""
+ |CREATE TABLE T (id INT, value STRING, dt STRING, hh INT)
+ |TBLPROPERTIES ('primary-key'='id, dt, hh', 'bucket'='$bucket',
'write-only'='true'$dynamicBucketArgs)
+ |PARTITIONED BY (dt, hh)
+ |""".stripMargin)
+
+ val table = loadTable("T")
+
+ spark.sql(s"INSERT INTO T VALUES (1, '1', '2024-01-01', 0), (2, '2',
'2024-01-01', 1)")
+ spark.sql(s"INSERT INTO T VALUES (5, '5', '2024-01-02', 0), (6, '6',
'2024-01-02', 1)")
+ spark.sql(s"INSERT INTO T VALUES (3, '3', '2024-01-01', 0), (4, '4',
'2024-01-01', 1)")
+ spark.sql(s"INSERT INTO T VALUES (7, '7', '2024-01-02', 0), (8, '8',
'2024-01-02', 1)")
+
+ Thread.sleep(10000);
+ spark.sql(s"INSERT INTO T VALUES (9, '9', '2024-01-01', 0), (10,
'10', '2024-01-02', 0)")
+
+ spark.sql("CALL sys.compact(table => 'T', partition_idle_time =>
'10s')")
+ val dataSplits =
table.newSnapshotReader.read.dataSplits.asScala.toList
+ Assertions
+ .assertThat(dataSplits.size)
+ .isEqualTo(4)
+
Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue
+ for (dataSplit: DataSplit <- dataSplits) {
+ if (dataSplit.partition().getInt(1) == 0) {
+ Assertions
+ .assertThat(dataSplit.dataFiles().size())
+ .isEqualTo(3)
+ } else {
+ Assertions
+ .assertThat(dataSplit.dataFiles().size())
+ .isEqualTo(1)
+ }
+ }
+ }
+ })
+
+ }
+
+ test("Paimon Procedure: compact with partition_idle_time for unaware bucket
append table") {
+ spark.sql(
+ s"""
+ |CREATE TABLE T (id INT, value STRING, dt STRING, hh INT)
+ |TBLPROPERTIES ('bucket'='-1', 'write-only'='true',
'compaction.min.file-num'='2', 'compaction.max.file-num'='2')
+ |PARTITIONED BY (dt, hh)
+ |""".stripMargin)
+
+ val table = loadTable("T")
+
+ spark.sql(s"INSERT INTO T VALUES (1, '1', '2024-01-01', 0), (2, '2',
'2024-01-01', 1)")
+ spark.sql(s"INSERT INTO T VALUES (5, '5', '2024-01-02', 0), (6, '6',
'2024-01-02', 1)")
+ spark.sql(s"INSERT INTO T VALUES (3, '3', '2024-01-01', 0), (4, '4',
'2024-01-01', 1)")
+ spark.sql(s"INSERT INTO T VALUES (7, '7', '2024-01-02', 0), (8, '8',
'2024-01-02', 1)")
+
+ Thread.sleep(10000);
+ spark.sql(s"INSERT INTO T VALUES (9, '9', '2024-01-01', 0), (10, '10',
'2024-01-02', 0)")
+
+ spark.sql("CALL sys.compact(table => 'T', partition_idle_time => '10s')")
+ val dataSplits = table.newSnapshotReader.read.dataSplits.asScala.toList
+ Assertions
+ .assertThat(dataSplits.size)
+ .isEqualTo(4)
+
Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue
+ for (dataSplit: DataSplit <- dataSplits) {
+ if (dataSplit.partition().getInt(1) == 0) {
+ Assertions
+ .assertThat(dataSplit.dataFiles().size())
+ .isEqualTo(3)
+ } else {
+ Assertions
+ .assertThat(dataSplit.dataFiles().size())
+ .isEqualTo(1)
+ }
+ }
+ }
+
def lastSnapshotCommand(table: FileStoreTable): CommitKind = {
table.snapshotManager().latestSnapshot().commitKind()
}