This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2c0b34e44 [flink] CompactAction support catalog config (#1069)
2c0b34e44 is described below
commit 2c0b34e445f6e45c9a0711d28ee8dbfb2996a2dc
Author: Daixinyu <[email protected]>
AuthorDate: Sat May 6 15:33:50 2023 +0800
[flink] CompactAction support catalog config (#1069)
---
docs/content/maintenance/write-performance.md | 24 ++++++++-
.../org/apache/paimon/flink/action/Action.java | 19 ++++++++
.../org/apache/paimon/flink/action/ActionBase.java | 11 ++---
.../apache/paimon/flink/action/CompactAction.java | 27 ++++++++--
.../action/cdc/mysql/MySqlSyncDatabaseAction.java | 57 +++++++---------------
5 files changed, 85 insertions(+), 53 deletions(-)
diff --git a/docs/content/maintenance/write-performance.md
b/docs/content/maintenance/write-performance.md
index 467b0bc01..1dd82cf3b 100644
--- a/docs/content/maintenance/write-performance.md
+++ b/docs/content/maintenance/write-performance.md
@@ -165,9 +165,12 @@ Run the following command to submit a compaction job for
the table.
/path/to/paimon-flink-**-{{< version >}}.jar \
compact \
--warehouse <warehouse-path> \
- --database <database-name> \
- --table <table-name>
+ --database <database-name> \
+ --table <table-name> \
+ [--partition <partition-name>] \
+ [--catalog-conf <paimon-catalog-conf> [--catalog-conf
<paimon-catalog-conf> ...]] \
```
+* `--catalog-conf` is the configuration for Paimon catalog. Each configuration
should be specified in the format `key=value`. See [here]({{< ref
"maintenance/configurations" >}}) for a complete list of catalog configurations.
If you submit a batch job (set `execution.runtime-mode: batch` in Flink's
configuration), all current table files will be compacted. If you submit a
streaming job (set `execution.runtime-mode: streaming` in Flink's
configuration), the job will continuously monitor new changes to the table and
perform compactions as needed.
@@ -177,6 +180,23 @@ If you only want to submit the compaction job and don't
want to wait until the j
{{< /hint >}}
+Example
+
+```bash
+<FLINK_HOME>/bin/flink run \
+ -c org.apache.paimon.flink.action.FlinkActions \
+ /path/to/paimon-flink-**-{{< version >}}.jar \
+ compact \
+ --warehouse s3:///path/to/warehouse \
+ --database test_db \
+ --table test_table \
+ --partition dt=20221126,hh=08 \
+ --partition dt=20221127,hh=09 \
+ --catalog-conf s3.endpoint=https://****.com \
+ --catalog-conf s3.access-key=***** \
+ --catalog-conf s3.secret-key=*****
+```
+
For more usage of the compact action, see
```bash
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
index b1dc1204c..7cb6ca8d3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
@@ -161,4 +161,23 @@ public interface Action {
System.out.println("For detailed options of each action, run
<action> --help");
}
}
+
+ static Optional<Map<String, String>> getConfigMap(MultipleParameterTool
params, String key) {
+ if (!params.has(key)) {
+ return Optional.empty();
+ }
+
+ Map<String, String> map = new HashMap<>();
+ for (String param : params.getMultiParameter(key)) {
+ String[] kv = param.split("=");
+ if (kv.length == 2) {
+ map.put(kv[0], kv[1]);
+ continue;
+ }
+
+ System.err.println("Invalid key " + key + ". Please use format
'key=value'");
+ return Optional.empty();
+ }
+ return Optional.of(map);
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
index 528b68f25..de310148c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
@@ -78,12 +78,10 @@ public abstract class ActionBase implements Action {
this(warehouse, databaseName, tableName, new Options());
}
- ActionBase(String warehouse, String databaseName, String tableName,
Options options) {
+ ActionBase(String warehouse, String databaseName, String tableName,
Options catalogOptions) {
+ catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse);
identifier = new Identifier(databaseName, tableName);
- catalog =
- CatalogFactory.createCatalog(
- CatalogContext.create(
- new Options().set(CatalogOptions.WAREHOUSE,
warehouse)));
+ catalog =
CatalogFactory.createCatalog(CatalogContext.create(catalogOptions));
flinkCatalog = new FlinkCatalog(catalog, catalogName,
DEFAULT_DATABASE);
env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -95,9 +93,6 @@ public abstract class ActionBase implements Action {
try {
table = catalog.getTable(identifier);
- if (options.toMap().size() > 0) {
- table = table.copy(options.toMap());
- }
} catch (Catalog.TableNotExistException e) {
LOG.error("Table doesn't exist in given path.", e);
System.err.println("Table doesn't exist in given path.");
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index d21d5e704..b525992b6 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -36,10 +36,12 @@ import org.apache.flink.table.data.RowData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import static org.apache.paimon.flink.action.Action.getConfigMap;
import static org.apache.paimon.flink.action.Action.getPartitions;
import static org.apache.paimon.flink.action.Action.getTablePath;
@@ -52,14 +54,18 @@ public class CompactAction extends ActionBase {
private final CompactorSinkBuilder sinkBuilder;
CompactAction(String warehouse, String database, String tableName) {
- super(warehouse, database, tableName, new
Options().set(CoreOptions.WRITE_ONLY, false));
+ this(warehouse, database, tableName, new Options());
+ }
+
+ CompactAction(String warehouse, String database, String tableName, Options
catalogOptions) {
+ super(warehouse, database, tableName, catalogOptions);
if (!(table instanceof FileStoreTable)) {
throw new UnsupportedOperationException(
String.format(
"Only FileStoreTable supports compact action. The
table type is '%s'.",
table.getClass().getName()));
}
-
+ table =
table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
sourceBuilder =
new CompactorSourceBuilder(identifier.getFullName(),
(FileStoreTable) table);
sinkBuilder = new CompactorSinkBuilder((FileStoreTable) table);
@@ -104,7 +110,12 @@ public class CompactAction extends ActionBase {
return Optional.empty();
}
- CompactAction action = new CompactAction(tablePath.f0, tablePath.f1,
tablePath.f2);
+ Optional<Map<String, String>> catalogConfigOption =
getConfigMap(params, "catalog-conf");
+ Options catalogOptions =
+
Options.fromMap(catalogConfigOption.orElse(Collections.emptyMap()));
+
+ CompactAction action =
+ new CompactAction(tablePath.f0, tablePath.f1, tablePath.f2,
catalogOptions);
if (params.has("partition")) {
List<Map<String, String>> partitions = getPartitions(params);
@@ -127,6 +138,9 @@ public class CompactAction extends ActionBase {
System.out.println(
" compact --warehouse <warehouse-path> --database
<database-name> "
+ "--table <table-name> [--partition
<partition-name>]");
+ System.out.println(
+ " compact --warehouse s3://path/to/warehouse --database
<database-name> "
+ + "--table <table-name> [--catalog-conf
<paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]]");
System.out.println(" compact --path <table-path> [--partition
<partition-name>]");
System.out.println();
@@ -142,6 +156,13 @@ public class CompactAction extends ActionBase {
System.out.println(
" compact --warehouse hdfs:///path/to/warehouse --database
test_db --table test_table "
+ "--partition dt=20221126,hh=08 --partition
dt=20221127,hh=09");
+ System.out.println(
+ " compact --warehouse s3:///path/to/warehouse "
+ + "--database test_db "
+ + "--table test_table "
+ + "--catalog-conf s3.endpoint=https://****.com "
+ + "--catalog-conf s3.access-key=***** "
+ + "--catalog-conf s3.secret-key=***** ");
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index dcd2c9d5c..d39702383 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -50,12 +50,12 @@ import java.sql.ResultSet;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;
+import static org.apache.paimon.flink.action.Action.getConfigMap;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/**
@@ -338,45 +338,22 @@ public class MySqlSyncDatabaseAction implements Action {
String includingTables = params.get("including-tables");
String excludingTables = params.get("excluding-tables");
- Map<String, String> mySqlConfig = getConfigMap(params, "mysql-conf");
- Map<String, String> catalogConfig = getConfigMap(params,
"catalog-conf");
- Map<String, String> tableConfig = getConfigMap(params, "table-conf");
- if (mySqlConfig == null) {
- return Optional.empty();
- }
-
- return Optional.of(
- new MySqlSyncDatabaseAction(
- mySqlConfig,
- warehouse,
- database,
- ignoreIncompatible,
- tablePrefix,
- tableSuffix,
- includingTables,
- excludingTables,
- catalogConfig == null ? Collections.emptyMap() :
catalogConfig,
- tableConfig == null ? Collections.emptyMap() :
tableConfig));
- }
-
- private static Map<String, String> getConfigMap(MultipleParameterTool
params, String key) {
- if (!params.has(key)) {
- return null;
- }
-
- Map<String, String> map = new HashMap<>();
- for (String param : params.getMultiParameter(key)) {
- String[] kv = param.split("=");
- if (kv.length == 2) {
- map.put(kv[0], kv[1]);
- continue;
- }
-
- System.err.println(
- "Invalid " + key + " " + param + ".\nRun
mysql-sync-database --help for help.");
- return null;
- }
- return map;
+ Optional<Map<String, String>> mySqlConfigOption = getConfigMap(params,
"mysql-conf");
+ Optional<Map<String, String>> catalogConfigOption =
getConfigMap(params, "catalog-conf");
+ Optional<Map<String, String>> tableConfigOption = getConfigMap(params,
"table-conf");
+ return mySqlConfigOption.map(
+ mySqlConfig ->
+ new MySqlSyncDatabaseAction(
+ mySqlConfig,
+ warehouse,
+ database,
+ ignoreIncompatible,
+ tablePrefix,
+ tableSuffix,
+ includingTables,
+ excludingTables,
+
catalogConfigOption.orElse(Collections.emptyMap()),
+
tableConfigOption.orElse(Collections.emptyMap())));
}
private static void printHelp() {