This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 88b6dd0fd [flink] Refactor Flink Database compaction job (#1987)
88b6dd0fd is described below

commit 88b6dd0fd41b1651b80393d42e662837a5f40ba3
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Sep 12 14:40:31 2023 +0800

    [flink] Refactor Flink Database compaction job (#1987)
---
 docs/content/concepts/file-layouts.md              |  2 +-
 ...multiple-writers.md => dedicated-compaction.md} | 79 ++++++++++++----------
 .../paimon/flink/action/CompactDatabaseAction.java | 12 ++--
 .../flink/action/CompactDatabaseActionFactory.java |  2 +-
 .../flink/action/CompactDatabaseActionITCase.java  | 16 ++---
 5 files changed, 60 insertions(+), 51 deletions(-)

diff --git a/docs/content/concepts/file-layouts.md 
b/docs/content/concepts/file-layouts.md
index fede66ff6..9b14b893a 100644
--- a/docs/content/concepts/file-layouts.md
+++ b/docs/content/concepts/file-layouts.md
@@ -77,4 +77,4 @@ To limit the number of sorted runs, we have to merge several 
sorted runs into on
 
 However, compaction is a resource intensive procedure which consumes a certain 
amount of CPU time and disk IO, so too frequent compaction may in turn result 
in slower writes. It is a trade-off between query and write performance. Paimon 
currently adapts a compaction strategy similar to Rocksdb's [universal 
compaction](https://github.com/facebook/rocksdb/wiki/Universal-Compaction).
 
-By default, when Paimon appends records to the LSM tree, it will also perform 
compactions as needed. Users can also choose to perform all compactions in a 
dedicated compaction job. See [dedicated compaction job]({{< ref 
"maintenance/multiple-writers#dedicated-compaction-job" >}}) for more info.
+By default, when Paimon appends records to the LSM tree, it will also perform 
compactions as needed. Users can also choose to perform all compactions in a 
dedicated compaction job. See [dedicated compaction job]({{< ref 
"maintenance/dedicated-compaction#dedicated-compaction-job" >}}) for more info.
diff --git a/docs/content/maintenance/multiple-writers.md 
b/docs/content/maintenance/dedicated-compaction.md
similarity index 88%
rename from docs/content/maintenance/multiple-writers.md
rename to docs/content/maintenance/dedicated-compaction.md
index 1db009d79..e3f174015 100644
--- a/docs/content/maintenance/multiple-writers.md
+++ b/docs/content/maintenance/dedicated-compaction.md
@@ -1,9 +1,9 @@
 ---
-title: "Multiple Writers"
+title: "Dedicated Compaction"
 weight: 3
 type: docs
 aliases:
-- /maintenance/multiple-writers.html
+- /maintenance/dedicated-compaction.html
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -24,7 +24,7 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Multiple Writers
+# Dedicated Compaction
 
 Paimon's snapshot management supports writing with multiple writers.
 
@@ -96,7 +96,41 @@ Run the following command to submit a compaction job for the 
table.
     [--catalog-conf <paimon-catalog-conf> [--catalog-conf 
<paimon-catalog-conf> ...]] \
 ```
 
-Or run the following command to submit a compaction job for multiple database.
+Example: compact table
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    /path/to/paimon-flink-action-0.6-SNAPSHOT.jar \
+    compact \
+    --warehouse s3:///path/to/warehouse \
+    --database test_db \
+    --table test_table \
+    --partition dt=20221126,hh=08 \
+    --partition dt=20221127,hh=09 \
+    --catalog-conf s3.endpoint=https://****.com \
+    --catalog-conf s3.access-key=***** \
+    --catalog-conf s3.secret-key=*****
+```
+
+For more usage of the compact action, see
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
+    compact --help
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
+## Database Compaction Job
+
+You can run the following command to submit a compaction job for multiple 
database.
+
+{{< tabs "database-compaction-job" >}}
+
+{{< tab "Flink" >}}
 
 ```bash
 <FLINK_HOME>/bin/flink run \
@@ -108,7 +142,7 @@ Or run the following command to submit a compaction job for 
multiple database.
     [--excluding-tables <paimon-table-name|name-regular-expr>] \
     [--mode <compact-mode>] \
     [--catalog-conf <paimon-catalog-conf> [--catalog-conf 
<paimon-catalog-conf> ...]] \
-    [--compact-conf <paimon-compact-conf> [--compact-conf 
<paimon-compact-conf> ...]]
+    [--table-conf <paimon-table-conf> [--table-conf <paimon-table-conf> ...]]
 ```
 
 * `--including-databases` is used to specify which database is to be 
compacted. In compact mode, you need to specify a database name, in 
compact-database mode, you could specify multiple database, regular expression 
is supported.
@@ -118,14 +152,12 @@ Or run the following command to submit a compaction job 
for multiple database.
   * "divided" (the default mode if you haven't specified one): start a sink 
for each table, the compaction of the new table requires restarting the job.
   * "combined": start a single combined sink for all tables, the new table 
will be automatically compacted.
 * `--catalog-conf` is the configuration for Paimon catalog. Each configuration 
should be specified in the format `key=value`. See [here]({{< ref 
"maintenance/configurations" >}}) for a complete list of catalog configurations.
-* `--compact-conf` is the configuration for compaction in combined mode. Each 
configuration should be specified in the format `key=value`. Compact 
configuration is listed below:
+* `--table-conf` is the configuration for compaction. Each configuration 
should be specified in the format `key=value`. Pivotal configuration is listed 
below:
 
 | Key                               | Default | Type       | Description       
                                                                                
                                                                                
                          |
 
|-----------------------------------|---------|------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
 | continuous.discovery-interval     | 10 s    | Duration   | The discovery 
interval of continuous reading.                                                 
                                                                                
                              |
 | sink.parallelism                  | (none)  | Integer    | Defines a custom 
parallelism for the sink. By default, if this option is not defined, the 
planner will derive the parallelism for each statement individually by also 
considering the global configuration. |
-| sink.use-managed-memory-allocator | false   | Boolean    | If true, flink 
sink will use managed memory for merge tree; otherwise, it will create an 
independent memory allocator.                                                   
                                   |
-| sink.managed.writer-buffer-memory | 256 mb  | MemorySize | Weight of writer 
buffer in managed memory, Flink will compute the memory size for writer 
according to the weight, the actual memory used depends on the running 
environment.                                |
 
 If you submit a batch job (set `execution.runtime-mode: batch` in Flink's 
configuration), all current table files will be compacted. If you submit a 
streaming job (set `execution.runtime-mode: streaming` in Flink's 
configuration), the job will continuously monitor new changes to the table and 
perform compactions as needed.
 
@@ -141,23 +173,7 @@ You can set `--mode combined` to enable compacting newly 
added tables without re
 
 {{< /hint >}}
 
-Example1: compact table
-
-```bash
-<FLINK_HOME>/bin/flink run \
-    /path/to/paimon-flink-action-{{< version >}}.jar \
-    compact \
-    --warehouse s3:///path/to/warehouse \
-    --database test_db \
-    --table test_table \
-    --partition dt=20221126,hh=08 \
-    --partition dt=20221127,hh=09 \
-    --catalog-conf s3.endpoint=https://****.com \
-    --catalog-conf s3.access-key=***** \
-    --catalog-conf s3.secret-key=*****
-```
-
-Example2: compact database
+Example1: compact database
 
 ```bash
 <FLINK_HOME>/bin/flink run \
@@ -170,7 +186,7 @@ Example2: compact database
     --catalog-conf s3.secret-key=*****
 ```
 
-Example3: compact database in combined mode
+Example2: compact database in combined mode
 
 ```bash
 <FLINK_HOME>/bin/flink run \
@@ -182,17 +198,10 @@ Example3: compact database in combined mode
     --catalog-conf s3.endpoint=https://****.com \
     --catalog-conf s3.access-key=***** \
     --catalog-conf s3.secret-key=***** \
-    --compact-conf continuous.discovery-interval=*****
+    --table-conf continuous.discovery-interval=*****
 ```
 
-For more usage of the compact action, see
-
-```bash
-<FLINK_HOME>/bin/flink run \
-    /path/to/paimon-flink-action-{{< version >}}.jar \
-    compact --help
-```
-or
+For more usage of the compact-database action, see
 
 ```bash
 <FLINK_HOME>/bin/flink run \
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
index e56c1f549..e07ec9bc9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
@@ -68,7 +68,7 @@ public class CompactDatabaseAction extends ActionBase {
 
     private final Map<String, Table> tableMap = new HashMap<>();
 
-    private Options compactOptions = new Options();
+    private Options tableOptions = new Options();
 
     public CompactDatabaseAction(String warehouse, Map<String, String> 
catalogConfig) {
         super(warehouse, catalogConfig);
@@ -98,8 +98,8 @@ public class CompactDatabaseAction extends ActionBase {
         return this;
     }
 
-    public CompactDatabaseAction withCompactOptions(Map<String, String> 
compactOptions) {
-        this.compactOptions = Options.fromMap(compactOptions);
+    public CompactDatabaseAction withTableOptions(Map<String, String> 
tableOptions) {
+        this.tableOptions = Options.fromMap(tableOptions);
         return this;
     }
 
@@ -200,7 +200,7 @@ public class CompactDatabaseAction extends ActionBase {
                         databasePattern,
                         includingPattern,
                         excludingPattern,
-                        
compactOptions.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis());
+                        
tableOptions.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis());
         DataStream<RowData> source =
                 
sourceBuilder.withEnv(env).withContinuousMode(isStreaming).build();
 
@@ -208,8 +208,8 @@ public class CompactDatabaseAction extends ActionBase {
                 partition(
                         source,
                         new BucketsRowChannelComputer(),
-                        
compactOptions.get(FlinkConnectorOptions.SINK_PARALLELISM));
-        new MultiTablesCompactorSink(catalogLoader(), 
compactOptions).sinkFrom(partitioned);
+                        
tableOptions.get(FlinkConnectorOptions.SINK_PARALLELISM));
+        new MultiTablesCompactorSink(catalogLoader(), 
tableOptions).sinkFrom(partitioned);
     }
 
     private void buildForTraditionalCompaction(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java
index 559c28ed4..b7a1a776a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java
@@ -43,7 +43,7 @@ public class CompactDatabaseActionFactory implements 
ActionFactory {
                 .includingTables(params.get("including-tables"))
                 .excludingTables(params.get("excluding-tables"))
                 .withDatabaseCompactMode(params.get("mode"))
-                .withCompactOptions(optionalConfigMap(params, "compact-conf"));
+                .withTableOptions(optionalConfigMap(params, "table-conf"));
 
         return Optional.of(action);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
index 0d15e9606..2c112f290 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
@@ -97,7 +97,7 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
         Map<String, String> options = new HashMap<>();
         options.put(CoreOptions.WRITE_ONLY.key(), "true");
 
-        Map<String, String> compactOptions = new HashMap<>();
+        Map<String, String> tableOptions = new HashMap<>();
 
         List<FileStoreTable> tables = new ArrayList<>();
 
@@ -149,7 +149,7 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
                     .includingTables(null)
                     .excludingTables(null)
                     .withDatabaseCompactMode("combined")
-                    .withCompactOptions(compactOptions)
+                    .withTableOptions(tableOptions)
                     .build(env);
         }
 
@@ -184,10 +184,10 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
         options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "3");
         options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "3");
 
-        Map<String, String> compactOptions = new HashMap<>();
+        Map<String, String> tableOptions = new HashMap<>();
         // if CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key() use default 
value, the cost time in
         // combined mode will be over 1mins
-        compactOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), 
"1s");
+        tableOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), 
"1s");
 
         List<FileStoreTable> tables = new ArrayList<>();
         for (String dbName : DATABASE_NAMES) {
@@ -241,7 +241,7 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
                     .includingTables(null)
                     .excludingTables(null)
                     .withDatabaseCompactMode("combined")
-                    .withCompactOptions(compactOptions)
+                    .withTableOptions(tableOptions)
                     .build(env);
         }
         JobClient client = env.executeAsync();
@@ -442,8 +442,8 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
         Map<String, String> options = new HashMap<>();
         options.put(CoreOptions.WRITE_ONLY.key(), "true");
 
-        Map<String, String> compactOptions = new HashMap<>();
-        compactOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), 
"1s");
+        Map<String, String> tableOptions = new HashMap<>();
+        tableOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), 
"1s");
 
         List<FileStoreTable> compactionTables = new ArrayList<>();
         List<FileStoreTable> noCompactionTables = new ArrayList<>();
@@ -502,7 +502,7 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
                     .includingTables(includingPattern)
                     .excludingTables(excludesPattern)
                     .withDatabaseCompactMode("combined")
-                    .withCompactOptions(compactOptions)
+                    .withTableOptions(tableOptions)
                     .build(env);
         }
         env.execute();

Reply via email to