This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c0b34e44 [flink] CompactAction support catalog config (#1069)
2c0b34e44 is described below

commit 2c0b34e445f6e45c9a0711d28ee8dbfb2996a2dc
Author: Daixinyu <[email protected]>
AuthorDate: Sat May 6 15:33:50 2023 +0800

    [flink] CompactAction support catalog config (#1069)
---
 docs/content/maintenance/write-performance.md      | 24 ++++++++-
 .../org/apache/paimon/flink/action/Action.java     | 19 ++++++++
 .../org/apache/paimon/flink/action/ActionBase.java | 11 ++---
 .../apache/paimon/flink/action/CompactAction.java  | 27 ++++++++--
 .../action/cdc/mysql/MySqlSyncDatabaseAction.java  | 57 +++++++---------------
 5 files changed, 85 insertions(+), 53 deletions(-)

diff --git a/docs/content/maintenance/write-performance.md 
b/docs/content/maintenance/write-performance.md
index 467b0bc01..1dd82cf3b 100644
--- a/docs/content/maintenance/write-performance.md
+++ b/docs/content/maintenance/write-performance.md
@@ -165,9 +165,12 @@ Run the following command to submit a compaction job for 
the table.
     /path/to/paimon-flink-**-{{< version >}}.jar \
     compact \
     --warehouse <warehouse-path> \
-    --database <database-name> \
-    --table <table-name>
+    --database <database-name> \ 
+    --table <table-name> \
+    [--partition <partition-name>] \
+    [--catalog-conf <paimon-catalog-conf> [--catalog-conf 
<paimon-catalog-conf> ...]] \
 ```
+* `--catalog-conf` is the configuration for Paimon catalog. Each configuration 
should be specified in the format `key=value`. See [here]({{< ref 
"maintenance/configurations" >}}) for a complete list of catalog configurations.
 
 If you submit a batch job (set `execution.runtime-mode: batch` in Flink's 
configuration), all current table files will be compacted. If you submit a 
streaming job (set `execution.runtime-mode: streaming` in Flink's 
configuration), the job will continuously monitor new changes to the table and 
perform compactions as needed.
 
@@ -177,6 +180,23 @@ If you only want to submit the compaction job and don't 
want to wait until the j
 
 {{< /hint >}}
 
+Example
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    -c org.apache.paimon.flink.action.FlinkActions \
+    /path/to/paimon-flink-**-{{< version >}}.jar \
+    compact \
+    --warehouse s3:///path/to/warehouse \
+    --database test_db \
+    --table test_table \
+    --partition dt=20221126,hh=08 \
+    --partition dt=20221127,hh=09 \
+    --catalog-conf s3.endpoint=https://****.com \
+    --catalog-conf s3.access-key=***** \
+    --catalog-conf s3.secret-key=*****
+```
+
 For more usage of the compact action, see
 
 ```bash
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
index b1dc1204c..7cb6ca8d3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
@@ -161,4 +161,23 @@ public interface Action {
             System.out.println("For detailed options of each action, run 
<action> --help");
         }
     }
+
+    static Optional<Map<String, String>> getConfigMap(MultipleParameterTool 
params, String key) {
+        if (!params.has(key)) {
+            return Optional.empty();
+        }
+
+        Map<String, String> map = new HashMap<>();
+        for (String param : params.getMultiParameter(key)) {
+            String[] kv = param.split("=");
+            if (kv.length == 2) {
+                map.put(kv[0], kv[1]);
+                continue;
+            }
+
+            System.err.println("Invalid key " + key + ". Please use format 
'key=value'");
+            return Optional.empty();
+        }
+        return Optional.of(map);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
index 528b68f25..de310148c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
@@ -78,12 +78,10 @@ public abstract class ActionBase implements Action {
         this(warehouse, databaseName, tableName, new Options());
     }
 
-    ActionBase(String warehouse, String databaseName, String tableName, 
Options options) {
+    ActionBase(String warehouse, String databaseName, String tableName, 
Options catalogOptions) {
+        catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse);
         identifier = new Identifier(databaseName, tableName);
-        catalog =
-                CatalogFactory.createCatalog(
-                        CatalogContext.create(
-                                new Options().set(CatalogOptions.WAREHOUSE, 
warehouse)));
+        catalog = 
CatalogFactory.createCatalog(CatalogContext.create(catalogOptions));
         flinkCatalog = new FlinkCatalog(catalog, catalogName, 
DEFAULT_DATABASE);
 
         env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -95,9 +93,6 @@ public abstract class ActionBase implements Action {
 
         try {
             table = catalog.getTable(identifier);
-            if (options.toMap().size() > 0) {
-                table = table.copy(options.toMap());
-            }
         } catch (Catalog.TableNotExistException e) {
             LOG.error("Table doesn't exist in given path.", e);
             System.err.println("Table doesn't exist in given path.");
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index d21d5e704..b525992b6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -36,10 +36,12 @@ import org.apache.flink.table.data.RowData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
+import static org.apache.paimon.flink.action.Action.getConfigMap;
 import static org.apache.paimon.flink.action.Action.getPartitions;
 import static org.apache.paimon.flink.action.Action.getTablePath;
 
@@ -52,14 +54,18 @@ public class CompactAction extends ActionBase {
     private final CompactorSinkBuilder sinkBuilder;
 
     CompactAction(String warehouse, String database, String tableName) {
-        super(warehouse, database, tableName, new 
Options().set(CoreOptions.WRITE_ONLY, false));
+        this(warehouse, database, tableName, new Options());
+    }
+
+    CompactAction(String warehouse, String database, String tableName, Options 
catalogOptions) {
+        super(warehouse, database, tableName, catalogOptions);
         if (!(table instanceof FileStoreTable)) {
             throw new UnsupportedOperationException(
                     String.format(
                             "Only FileStoreTable supports compact action. The 
table type is '%s'.",
                             table.getClass().getName()));
         }
-
+        table = 
table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
         sourceBuilder =
                 new CompactorSourceBuilder(identifier.getFullName(), 
(FileStoreTable) table);
         sinkBuilder = new CompactorSinkBuilder((FileStoreTable) table);
@@ -104,7 +110,12 @@ public class CompactAction extends ActionBase {
             return Optional.empty();
         }
 
-        CompactAction action = new CompactAction(tablePath.f0, tablePath.f1, 
tablePath.f2);
+        Optional<Map<String, String>> catalogConfigOption = 
getConfigMap(params, "catalog-conf");
+        Options catalogOptions =
+                
Options.fromMap(catalogConfigOption.orElse(Collections.emptyMap()));
+
+        CompactAction action =
+                new CompactAction(tablePath.f0, tablePath.f1, tablePath.f2, 
catalogOptions);
 
         if (params.has("partition")) {
             List<Map<String, String>> partitions = getPartitions(params);
@@ -127,6 +138,9 @@ public class CompactAction extends ActionBase {
         System.out.println(
                 "  compact --warehouse <warehouse-path> --database 
<database-name> "
                         + "--table <table-name> [--partition 
<partition-name>]");
+        System.out.println(
+                "  compact --warehouse s3://path/to/warehouse --database 
<database-name> "
+                        + "--table <table-name> [--catalog-conf 
<paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]]");
         System.out.println("  compact --path <table-path> [--partition 
<partition-name>]");
         System.out.println();
 
@@ -142,6 +156,13 @@ public class CompactAction extends ActionBase {
         System.out.println(
                 "  compact --warehouse hdfs:///path/to/warehouse --database 
test_db --table test_table "
                         + "--partition dt=20221126,hh=08 --partition 
dt=20221127,hh=09");
+        System.out.println(
+                "  compact --warehouse s3:///path/to/warehouse "
+                        + "--database test_db "
+                        + "--table test_table "
+                        + "--catalog-conf s3.endpoint=https://****.com "
+                        + "--catalog-conf s3.access-key=***** "
+                        + "--catalog-conf s3.secret-key=***** ");
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index dcd2c9d5c..d39702383 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -50,12 +50,12 @@ import java.sql.ResultSet;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.regex.Pattern;
 
+import static org.apache.paimon.flink.action.Action.getConfigMap;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /**
@@ -338,45 +338,22 @@ public class MySqlSyncDatabaseAction implements Action {
         String includingTables = params.get("including-tables");
         String excludingTables = params.get("excluding-tables");
 
-        Map<String, String> mySqlConfig = getConfigMap(params, "mysql-conf");
-        Map<String, String> catalogConfig = getConfigMap(params, 
"catalog-conf");
-        Map<String, String> tableConfig = getConfigMap(params, "table-conf");
-        if (mySqlConfig == null) {
-            return Optional.empty();
-        }
-
-        return Optional.of(
-                new MySqlSyncDatabaseAction(
-                        mySqlConfig,
-                        warehouse,
-                        database,
-                        ignoreIncompatible,
-                        tablePrefix,
-                        tableSuffix,
-                        includingTables,
-                        excludingTables,
-                        catalogConfig == null ? Collections.emptyMap() : 
catalogConfig,
-                        tableConfig == null ? Collections.emptyMap() : 
tableConfig));
-    }
-
-    private static Map<String, String> getConfigMap(MultipleParameterTool 
params, String key) {
-        if (!params.has(key)) {
-            return null;
-        }
-
-        Map<String, String> map = new HashMap<>();
-        for (String param : params.getMultiParameter(key)) {
-            String[] kv = param.split("=");
-            if (kv.length == 2) {
-                map.put(kv[0], kv[1]);
-                continue;
-            }
-
-            System.err.println(
-                    "Invalid " + key + " " + param + ".\nRun 
mysql-sync-database --help for help.");
-            return null;
-        }
-        return map;
+        Optional<Map<String, String>> mySqlConfigOption = getConfigMap(params, 
"mysql-conf");
+        Optional<Map<String, String>> catalogConfigOption = 
getConfigMap(params, "catalog-conf");
+        Optional<Map<String, String>> tableConfigOption = getConfigMap(params, 
"table-conf");
+        return mySqlConfigOption.map(
+                mySqlConfig ->
+                        new MySqlSyncDatabaseAction(
+                                mySqlConfig,
+                                warehouse,
+                                database,
+                                ignoreIncompatible,
+                                tablePrefix,
+                                tableSuffix,
+                                includingTables,
+                                excludingTables,
+                                
catalogConfigOption.orElse(Collections.emptyMap()),
+                                
tableConfigOption.orElse(Collections.emptyMap())));
     }
 
     private static void printHelp() {

Reply via email to