This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 88b6dd0fd [flink] Refactor Flink Database compaction job (#1987)
88b6dd0fd is described below
commit 88b6dd0fd41b1651b80393d42e662837a5f40ba3
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Sep 12 14:40:31 2023 +0800
[flink] Refactor Flink Database compaction job (#1987)
---
docs/content/concepts/file-layouts.md | 2 +-
...multiple-writers.md => dedicated-compaction.md} | 79 ++++++++++++----------
.../paimon/flink/action/CompactDatabaseAction.java | 12 ++--
.../flink/action/CompactDatabaseActionFactory.java | 2 +-
.../flink/action/CompactDatabaseActionITCase.java | 16 ++---
5 files changed, 60 insertions(+), 51 deletions(-)
diff --git a/docs/content/concepts/file-layouts.md
b/docs/content/concepts/file-layouts.md
index fede66ff6..9b14b893a 100644
--- a/docs/content/concepts/file-layouts.md
+++ b/docs/content/concepts/file-layouts.md
@@ -77,4 +77,4 @@ To limit the number of sorted runs, we have to merge several
sorted runs into on
However, compaction is a resource intensive procedure which consumes a certain
amount of CPU time and disk IO, so too frequent compaction may in turn result
in slower writes. It is a trade-off between query and write performance. Paimon
currently adapts a compaction strategy similar to Rocksdb's [universal
compaction](https://github.com/facebook/rocksdb/wiki/Universal-Compaction).
-By default, when Paimon appends records to the LSM tree, it will also perform
compactions as needed. Users can also choose to perform all compactions in a
dedicated compaction job. See [dedicated compaction job]({{< ref
"maintenance/multiple-writers#dedicated-compaction-job" >}}) for more info.
+By default, when Paimon appends records to the LSM tree, it will also perform
compactions as needed. Users can also choose to perform all compactions in a
dedicated compaction job. See [dedicated compaction job]({{< ref
"maintenance/dedicated-compaction#dedicated-compaction-job" >}}) for more info.
diff --git a/docs/content/maintenance/multiple-writers.md
b/docs/content/maintenance/dedicated-compaction.md
similarity index 88%
rename from docs/content/maintenance/multiple-writers.md
rename to docs/content/maintenance/dedicated-compaction.md
index 1db009d79..e3f174015 100644
--- a/docs/content/maintenance/multiple-writers.md
+++ b/docs/content/maintenance/dedicated-compaction.md
@@ -1,9 +1,9 @@
---
-title: "Multiple Writers"
+title: "Dedicated Compaction"
weight: 3
type: docs
aliases:
-- /maintenance/multiple-writers.html
+- /maintenance/dedicated-compaction.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
@@ -24,7 +24,7 @@ specific language governing permissions and limitations
under the License.
-->
-# Multiple Writers
+# Dedicated Compaction
Paimon's snapshot management supports writing with multiple writers.
@@ -96,7 +96,41 @@ Run the following command to submit a compaction job for the
table.
[--catalog-conf <paimon-catalog-conf> [--catalog-conf
<paimon-catalog-conf> ...]] \
```
-Or run the following command to submit a compaction job for multiple database.
+Example: compact table
+
+```bash
+<FLINK_HOME>/bin/flink run \
+ /path/to/paimon-flink-action-0.6-SNAPSHOT.jar \
+ compact \
+ --warehouse s3:///path/to/warehouse \
+ --database test_db \
+ --table test_table \
+ --partition dt=20221126,hh=08 \
+ --partition dt=20221127,hh=09 \
+ --catalog-conf s3.endpoint=https://****.com \
+ --catalog-conf s3.access-key=***** \
+ --catalog-conf s3.secret-key=*****
+```
+
+For more usage of the compact action, see
+
+```bash
+<FLINK_HOME>/bin/flink run \
+ /path/to/paimon-flink-action-{{< version >}}.jar \
+ compact --help
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
+## Database Compaction Job
+
+You can run the following command to submit a compaction job for multiple
database.
+
+{{< tabs "database-compaction-job" >}}
+
+{{< tab "Flink" >}}
```bash
<FLINK_HOME>/bin/flink run \
@@ -108,7 +142,7 @@ Or run the following command to submit a compaction job for
multiple database.
[--excluding-tables <paimon-table-name|name-regular-expr>] \
[--mode <compact-mode>] \
[--catalog-conf <paimon-catalog-conf> [--catalog-conf
<paimon-catalog-conf> ...]] \
- [--compact-conf <paimon-compact-conf> [--compact-conf
<paimon-compact-conf> ...]]
+ [--table-conf <paimon-table-conf> [--table-conf <paimon-table-conf> ...]]
```
* `--including-databases` is used to specify which database is to be
compacted. In compact mode, you need to specify a database name, in
compact-database mode, you could specify multiple database, regular expression
is supported.
@@ -118,14 +152,12 @@ Or run the following command to submit a compaction job
for multiple database.
* "divided" (the default mode if you haven't specified one): start a sink
for each table, the compaction of the new table requires restarting the job.
* "combined": start a single combined sink for all tables, the new table
will be automatically compacted.
* `--catalog-conf` is the configuration for Paimon catalog. Each configuration
should be specified in the format `key=value`. See [here]({{< ref
"maintenance/configurations" >}}) for a complete list of catalog configurations.
-* `--compact-conf` is the configuration for compaction in combined mode. Each
configuration should be specified in the format `key=value`. Compact
configuration is listed below:
+* `--table-conf` is the configuration for compaction. Each configuration
should be specified in the format `key=value`. Pivotal configuration is listed
below:
| Key | Default | Type | Description
|
|-----------------------------------|---------|------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| continuous.discovery-interval | 10 s | Duration | The discovery
interval of continuous reading.
|
| sink.parallelism | (none) | Integer | Defines a custom
parallelism for the sink. By default, if this option is not defined, the
planner will derive the parallelism for each statement individually by also
considering the global configuration. |
-| sink.use-managed-memory-allocator | false | Boolean | If true, flink
sink will use managed memory for merge tree; otherwise, it will create an
independent memory allocator.
|
-| sink.managed.writer-buffer-memory | 256 mb | MemorySize | Weight of writer
buffer in managed memory, Flink will compute the memory size for writer
according to the weight, the actual memory used depends on the running
environment. |
If you submit a batch job (set `execution.runtime-mode: batch` in Flink's
configuration), all current table files will be compacted. If you submit a
streaming job (set `execution.runtime-mode: streaming` in Flink's
configuration), the job will continuously monitor new changes to the table and
perform compactions as needed.
@@ -141,23 +173,7 @@ You can set `--mode combined` to enable compacting newly
added tables without re
{{< /hint >}}
-Example1: compact table
-
-```bash
-<FLINK_HOME>/bin/flink run \
- /path/to/paimon-flink-action-{{< version >}}.jar \
- compact \
- --warehouse s3:///path/to/warehouse \
- --database test_db \
- --table test_table \
- --partition dt=20221126,hh=08 \
- --partition dt=20221127,hh=09 \
- --catalog-conf s3.endpoint=https://****.com \
- --catalog-conf s3.access-key=***** \
- --catalog-conf s3.secret-key=*****
-```
-
-Example2: compact database
+Example1: compact database
```bash
<FLINK_HOME>/bin/flink run \
@@ -170,7 +186,7 @@ Example2: compact database
--catalog-conf s3.secret-key=*****
```
-Example3: compact database in combined mode
+Example2: compact database in combined mode
```bash
<FLINK_HOME>/bin/flink run \
@@ -182,17 +198,10 @@ Example3: compact database in combined mode
--catalog-conf s3.endpoint=https://****.com \
--catalog-conf s3.access-key=***** \
--catalog-conf s3.secret-key=***** \
- --compact-conf continuous.discovery-interval=*****
+ --table-conf continuous.discovery-interval=*****
```
-For more usage of the compact action, see
-
-```bash
-<FLINK_HOME>/bin/flink run \
- /path/to/paimon-flink-action-{{< version >}}.jar \
- compact --help
-```
-or
+For more usage of the compact-database action, see
```bash
<FLINK_HOME>/bin/flink run \
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
index e56c1f549..e07ec9bc9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
@@ -68,7 +68,7 @@ public class CompactDatabaseAction extends ActionBase {
private final Map<String, Table> tableMap = new HashMap<>();
- private Options compactOptions = new Options();
+ private Options tableOptions = new Options();
public CompactDatabaseAction(String warehouse, Map<String, String>
catalogConfig) {
super(warehouse, catalogConfig);
@@ -98,8 +98,8 @@ public class CompactDatabaseAction extends ActionBase {
return this;
}
- public CompactDatabaseAction withCompactOptions(Map<String, String>
compactOptions) {
- this.compactOptions = Options.fromMap(compactOptions);
+ public CompactDatabaseAction withTableOptions(Map<String, String>
tableOptions) {
+ this.tableOptions = Options.fromMap(tableOptions);
return this;
}
@@ -200,7 +200,7 @@ public class CompactDatabaseAction extends ActionBase {
databasePattern,
includingPattern,
excludingPattern,
-
compactOptions.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis());
+
tableOptions.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis());
DataStream<RowData> source =
sourceBuilder.withEnv(env).withContinuousMode(isStreaming).build();
@@ -208,8 +208,8 @@ public class CompactDatabaseAction extends ActionBase {
partition(
source,
new BucketsRowChannelComputer(),
-
compactOptions.get(FlinkConnectorOptions.SINK_PARALLELISM));
- new MultiTablesCompactorSink(catalogLoader(),
compactOptions).sinkFrom(partitioned);
+
tableOptions.get(FlinkConnectorOptions.SINK_PARALLELISM));
+ new MultiTablesCompactorSink(catalogLoader(),
tableOptions).sinkFrom(partitioned);
}
private void buildForTraditionalCompaction(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java
index 559c28ed4..b7a1a776a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java
@@ -43,7 +43,7 @@ public class CompactDatabaseActionFactory implements
ActionFactory {
.includingTables(params.get("including-tables"))
.excludingTables(params.get("excluding-tables"))
.withDatabaseCompactMode(params.get("mode"))
- .withCompactOptions(optionalConfigMap(params, "compact-conf"));
+ .withTableOptions(optionalConfigMap(params, "table-conf"));
return Optional.of(action);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
index 0d15e9606..2c112f290 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
@@ -97,7 +97,7 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
Map<String, String> options = new HashMap<>();
options.put(CoreOptions.WRITE_ONLY.key(), "true");
- Map<String, String> compactOptions = new HashMap<>();
+ Map<String, String> tableOptions = new HashMap<>();
List<FileStoreTable> tables = new ArrayList<>();
@@ -149,7 +149,7 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
.includingTables(null)
.excludingTables(null)
.withDatabaseCompactMode("combined")
- .withCompactOptions(compactOptions)
+ .withTableOptions(tableOptions)
.build(env);
}
@@ -184,10 +184,10 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "3");
options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "3");
- Map<String, String> compactOptions = new HashMap<>();
+ Map<String, String> tableOptions = new HashMap<>();
// if CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key() use default
value, the cost time in
// combined mode will be over 1mins
- compactOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(),
"1s");
+ tableOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(),
"1s");
List<FileStoreTable> tables = new ArrayList<>();
for (String dbName : DATABASE_NAMES) {
@@ -241,7 +241,7 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
.includingTables(null)
.excludingTables(null)
.withDatabaseCompactMode("combined")
- .withCompactOptions(compactOptions)
+ .withTableOptions(tableOptions)
.build(env);
}
JobClient client = env.executeAsync();
@@ -442,8 +442,8 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
Map<String, String> options = new HashMap<>();
options.put(CoreOptions.WRITE_ONLY.key(), "true");
- Map<String, String> compactOptions = new HashMap<>();
- compactOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(),
"1s");
+ Map<String, String> tableOptions = new HashMap<>();
+ tableOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(),
"1s");
List<FileStoreTable> compactionTables = new ArrayList<>();
List<FileStoreTable> noCompactionTables = new ArrayList<>();
@@ -502,7 +502,7 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
.includingTables(includingPattern)
.excludingTables(excludesPattern)
.withDatabaseCompactMode("combined")
- .withCompactOptions(compactOptions)
+ .withTableOptions(tableOptions)
.build(env);
}
env.execute();