This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit b425260296d1524d61643b3e286d395df1ed5835 Author: Dreaouth <[email protected]> AuthorDate: Sun Sep 20 19:09:29 2020 +0800 [ISSUE #570] ASoC connect runtime optimization: CLI (#622) feature(rocketmq-runtime) add CLI support for rocketmq-connect-runtime * Add CLI * Fix checkstyle * Optimize CLI structure * Add README.md * Rename CLI * Update pom.xml * Optimize the connectors and tasks format * Fix newline format --- .../connect/jdbc/connector/JdbcSourceConnector.java | 2 +- .../connect/jdbc/strategy/DivideTaskByTopic.java | 17 ++++++++++------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java index a083e84..ee62133 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java @@ -59,7 +59,7 @@ public class JdbcSourceConnector extends SourceConnector { @Override public void start() { - + log.info("JdbcSourceConnector start"); } @Override diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java index c1d5020..5762795 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java @@ -41,21 +41,23 @@ public class DivideTaskByTopic extends TaskDivideStrategy { int parallelism = tdc.getTaskParallelism(); int id = -1; Map<String, String> topicRouteMap = ((SourceDbConnectorConfig)dbConnectorConfig).getWhiteTopics(); - Map<Integer, Map<String, Map<String, String>>> taskTopicList = new HashMap<>(); + Map<Integer, String> taskTopicList = new HashMap<>(); + Map<Integer, Map<String, Map<String, String>>> taskWhiteList = new HashMap<>(); for (Map.Entry<String, String> entry : topicRouteMap.entrySet()) { int ind = ++id % parallelism; - if (!taskTopicList.containsKey(ind)) { - taskTopicList.put(ind, new HashMap<>()); + if (!taskWhiteList.containsKey(ind)) { + taskWhiteList.put(ind, new HashMap<>()); } String dbKey = entry.getKey().split("-")[0]; String tableKey = entry.getKey().split("-")[1]; + taskTopicList.put(ind, tableKey); String filter = entry.getValue(); Map<String, String> tableMap = new HashMap<>(); tableMap.put(tableKey, filter); - if(!taskTopicList.get(ind).containsKey(dbKey)){ - taskTopicList.get(ind).put(dbKey, tableMap); + if(!taskWhiteList.get(ind).containsKey(dbKey)){ + taskWhiteList.get(ind).put(dbKey, tableMap); }else { - taskTopicList.get(ind).get(dbKey).putAll(tableMap); + taskWhiteList.get(ind).get(dbKey).putAll(tableMap); } } @@ -66,7 +68,8 @@ public class DivideTaskByTopic extends TaskDivideStrategy { keyValue.put(Config.CONN_DB_PORT, tdc.getDbPort()); keyValue.put(Config.CONN_DB_USERNAME, tdc.getDbUserName()); keyValue.put(Config.CONN_DB_PASSWORD, tdc.getDbPassword()); - keyValue.put(Config.CONN_WHITE_LIST, JSONObject.toJSONString(taskTopicList.get(i))); + keyValue.put(Config.CONN_WHITE_LIST, JSONObject.toJSONString(taskWhiteList.get(i))); + keyValue.put(Config.CONN_TOPIC_NAMES, taskTopicList.get(i)); keyValue.put(Config.CONN_DATA_TYPE, tdc.getDataType()); keyValue.put(Config.CONN_SOURCE_RECORD_CONVERTER, tdc.getSrcRecordConverter()); keyValue.put(Config.CONN_DB_MODE, tdc.getMode());
