This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 8eb2f136 [improve]cdc tools parameter removes leading and trailing
spaces (#447)
8eb2f136 is described below
commit 8eb2f136263b9975af857077665be3eed5fc28e9
Author: wudongliang <[email protected]>
AuthorDate: Fri Jul 26 16:10:56 2024 +0800
[improve]cdc tools parameter removes leading and trailing spaces (#447)
---
.../org/apache/doris/flink/tools/cdc/CdcTools.java | 35 +++++-----
.../apache/doris/flink/tools/cdc/CdcToolsTest.java | 74 ++++++++++++++++++++--
2 files changed, 85 insertions(+), 24 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index b62f0f52..c91a0734 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -46,24 +46,25 @@ public class CdcTools {
System.out.println("Input args: " + Arrays.asList(args) + ".\n");
String operation = args[0].toLowerCase();
String[] opArgs = Arrays.copyOfRange(args, 1, args.length);
+ MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
switch (operation) {
case DatabaseSyncConfig.MYSQL_SYNC_DATABASE:
- createMySQLSyncDatabase(opArgs);
+ createMySQLSyncDatabase(params);
break;
case DatabaseSyncConfig.ORACLE_SYNC_DATABASE:
- createOracleSyncDatabase(opArgs);
+ createOracleSyncDatabase(params);
break;
case DatabaseSyncConfig.POSTGRES_SYNC_DATABASE:
- createPostgresSyncDatabase(opArgs);
+ createPostgresSyncDatabase(params);
break;
case DatabaseSyncConfig.SQLSERVER_SYNC_DATABASE:
- createSqlServerSyncDatabase(opArgs);
+ createSqlServerSyncDatabase(params);
break;
case DatabaseSyncConfig.MONGODB_SYNC_DATABASE:
- createMongoDBSyncDatabase(opArgs);
+ createMongoDBSyncDatabase(params);
break;
case DatabaseSyncConfig.DB2_SYNC_DATABASE:
- createDb2SyncDatabase(opArgs);
+ createDb2SyncDatabase(params);
break;
default:
System.out.println("Unknown operation " + operation);
@@ -71,8 +72,7 @@ public class CdcTools {
}
}
- private static void createMySQLSyncDatabase(String[] opArgs) throws
Exception {
- MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+ private static void createMySQLSyncDatabase(MultipleParameterTool params)
throws Exception {
Preconditions.checkArgument(params.has(DatabaseSyncConfig.MYSQL_CONF));
Map<String, String> mysqlMap = getConfigMap(params,
DatabaseSyncConfig.MYSQL_CONF);
Configuration mysqlConfig = Configuration.fromMap(mysqlMap);
@@ -80,8 +80,7 @@ public class CdcTools {
syncDatabase(params, databaseSync, mysqlConfig, SourceConnector.MYSQL);
}
- private static void createOracleSyncDatabase(String[] opArgs) throws
Exception {
- MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+ private static void createOracleSyncDatabase(MultipleParameterTool params)
throws Exception {
Preconditions.checkArgument(params.has(DatabaseSyncConfig.ORACLE_CONF));
Map<String, String> oracleMap = getConfigMap(params,
DatabaseSyncConfig.ORACLE_CONF);
Configuration oracleConfig = Configuration.fromMap(oracleMap);
@@ -89,8 +88,7 @@ public class CdcTools {
syncDatabase(params, databaseSync, oracleConfig,
SourceConnector.ORACLE);
}
- private static void createPostgresSyncDatabase(String[] opArgs) throws
Exception {
- MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+ private static void createPostgresSyncDatabase(MultipleParameterTool
params) throws Exception {
Preconditions.checkArgument(params.has(DatabaseSyncConfig.POSTGRES_CONF));
Map<String, String> postgresMap = getConfigMap(params,
DatabaseSyncConfig.POSTGRES_CONF);
Configuration postgresConfig = Configuration.fromMap(postgresMap);
@@ -98,8 +96,7 @@ public class CdcTools {
syncDatabase(params, databaseSync, postgresConfig,
SourceConnector.POSTGRES);
}
- private static void createSqlServerSyncDatabase(String[] opArgs) throws
Exception {
- MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+ private static void createSqlServerSyncDatabase(MultipleParameterTool
params) throws Exception {
Preconditions.checkArgument(params.has(DatabaseSyncConfig.SQLSERVER_CONF));
Map<String, String> postgresMap = getConfigMap(params,
DatabaseSyncConfig.SQLSERVER_CONF);
Configuration postgresConfig = Configuration.fromMap(postgresMap);
@@ -107,8 +104,7 @@ public class CdcTools {
syncDatabase(params, databaseSync, postgresConfig,
SourceConnector.SQLSERVER);
}
- private static void createMongoDBSyncDatabase(String[] opArgs) throws
Exception {
- MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+ private static void createMongoDBSyncDatabase(MultipleParameterTool
params) throws Exception {
Preconditions.checkArgument(params.has(DatabaseSyncConfig.MONGODB_CONF));
Map<String, String> mongoMap = getConfigMap(params,
DatabaseSyncConfig.MONGODB_CONF);
Configuration mongoConfig = Configuration.fromMap(mongoMap);
@@ -116,8 +112,7 @@ public class CdcTools {
syncDatabase(params, databaseSync, mongoConfig,
SourceConnector.MONGODB);
}
- private static void createDb2SyncDatabase(String[] opArgs) throws
Exception {
- MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+ private static void createDb2SyncDatabase(MultipleParameterTool params)
throws Exception {
Preconditions.checkArgument(params.has(DatabaseSyncConfig.DB2_CONF));
Map<String, String> db2Map = getConfigMap(params,
DatabaseSyncConfig.DB2_CONF);
Configuration db2Config = Configuration.fromMap(db2Map);
@@ -197,10 +192,10 @@ public class CdcTools {
for (String param : params.getMultiParameter(key)) {
String[] kv = param.split("=", 2);
if (kv.length == 2) {
- map.put(kv[0], kv[1]);
+ map.put(kv[0].trim(), kv[1].trim());
continue;
} else if (kv.length == 1 && EMPTY_KEYS.contains(kv[0])) {
- map.put(kv[0], "");
+ map.put(kv[0].trim(), "");
continue;
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcToolsTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcToolsTest.java
index 5944ba52..41bcb621 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcToolsTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcToolsTest.java
@@ -22,8 +22,14 @@ import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.junit.Assert;
import org.junit.Test;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
public class CdcToolsTest {
@@ -32,21 +38,81 @@ public class CdcToolsTest {
MultipleParameterTool params =
MultipleParameterTool.fromArgs(
new String[] {
- "--sink-conf", "fenodes=127.0.0.1:8030",
"--sink-conf", "password="
+ "--sink-conf",
+ "fenodes = 127.0.0.1:8030",
+ "--sink-conf",
+ "password=",
+ "--sink-conf",
+ "jdbc-url= jdbc:mysql://127.0.0.1:9030 ",
+ "--sink-conf",
+ "sink.label-prefix = label "
});
- Map<String, String> sinkConf = CdcTools.getConfigMap(params,
"sink-conf");
+ Map<String, String> sinkConf = CdcTools.getConfigMap(params,
DatabaseSyncConfig.SINK_CONF);
Map<String, String> excepted = new HashMap<>();
excepted.put("password", "");
excepted.put("fenodes", "127.0.0.1:8030");
+ excepted.put("jdbc-url", "jdbc:mysql://127.0.0.1:9030");
+ excepted.put("sink.label-prefix", "label");
Assert.assertEquals(sinkConf, excepted);
- Map<String, String> mysqlConf = CdcTools.getConfigMap(params,
"--mysql-conf");
+ Map<String, String> mysqlConf =
+ CdcTools.getConfigMap(params, DatabaseSyncConfig.MYSQL_CONF);
Assert.assertNull(mysqlConf);
MultipleParameterTool params2 =
MultipleParameterTool.fromArgs(new String[] {"--sink-conf",
"fenodes"});
- Map<String, String> sinkConf2 = CdcTools.getConfigMap(params2,
"sink-conf");
+ Map<String, String> sinkConf2 =
+ CdcTools.getConfigMap(params2, DatabaseSyncConfig.SINGLE_SINK);
Assert.assertNull(sinkConf2);
}
+
+ @Test
+ public void testGetConfigMap() {
+ Map<String, Collection<String>> config = new HashMap<>();
+ config.put(
+ DatabaseSyncConfig.MYSQL_CONF, Arrays.asList("
hostname=127.0.0.1", " port=3306"));
+ config.put(
+ DatabaseSyncConfig.POSTGRES_CONF,
+ Arrays.asList("hostname=127.0.0.1 ", "port=5432 "));
+ config.put(
+ DatabaseSyncConfig.SINK_CONF,
+ Arrays.asList(" fenodes=127.0.0.1:8030 ", " username=root"));
+ config.put(DatabaseSyncConfig.TABLE_CONF, Collections.singletonList("
replication_num=1"));
+ MultipleParameterTool parameter =
MultipleParameterTool.fromMultiMap(config);
+ Map<String, String> mysqlConfigMap =
+ CdcTools.getConfigMap(parameter,
DatabaseSyncConfig.MYSQL_CONF);
+ Map<String, String> postGresConfigMap =
+ CdcTools.getConfigMap(parameter,
DatabaseSyncConfig.POSTGRES_CONF);
+ Map<String, String> sinkConfigMap =
+ CdcTools.getConfigMap(parameter, DatabaseSyncConfig.SINK_CONF);
+ Map<String, String> tableConfigMap =
+ CdcTools.getConfigMap(parameter,
DatabaseSyncConfig.TABLE_CONF);
+
+ Set<String> mysqlKeyConf = new HashSet<>(Arrays.asList("hostname",
"port"));
+ Set<String> mysqlValueConf = new HashSet<>(Arrays.asList("127.0.0.1",
"3306"));
+ assertEquals(mysqlConfigMap, mysqlKeyConf, mysqlValueConf);
+
+ Set<String> postgresKeyConf = new HashSet<>(Arrays.asList("hostname",
"port"));
+ Set<String> postgresValueConf = new
HashSet<>(Arrays.asList("127.0.0.1", "5432"));
+ assertEquals(postGresConfigMap, postgresKeyConf, postgresValueConf);
+
+ Set<String> sinkKeyConf = new HashSet<>(Arrays.asList("fenodes",
"username"));
+ Set<String> sinkValueConf = new
HashSet<>(Arrays.asList("127.0.0.1:8030", "root"));
+ assertEquals(sinkConfigMap, sinkKeyConf, sinkValueConf);
+
+ Set<String> tableKeyConf = new
HashSet<>(Collections.singletonList("replication_num"));
+ Set<String> tableValueConf = new
HashSet<>(Collections.singletonList("1"));
+ assertEquals(tableConfigMap, tableKeyConf, tableValueConf);
+ }
+
+ private void assertEquals(
+ Map<String, String> actualMap, Set<String> keyConf, Set<String>
valueConf) {
+ for (Entry<String, String> entry : actualMap.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+ Assert.assertTrue(keyConf.contains(key));
+ Assert.assertTrue(valueConf.contains(value));
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]