This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 0b56e472982adb92ba7fa52bf2d3c9b5f326e969 Author: mike_xwm <[email protected]> AuthorDate: Wed Apr 1 13:54:08 2020 +0800 [ISSUE #545]bug fix (#546) Co-authored-by: MajorHe1 <[email protected]> --- pom.xml | 2 + .../rocketmq/connect/jdbc/common/CloneUtils.java | 28 +++++++++++ .../rocketmq/connect/jdbc/common/ConstDefine.java | 2 +- .../rocketmq/connect/jdbc/common/DBUtils.java | 2 +- .../rocketmq/connect/jdbc/config/Config.java | 31 ------------ .../connect/jdbc/config/SinkDbConnectorConfig.java | 11 +++-- .../jdbc/config/SourceDbConnectorConfig.java | 4 +- .../connect/jdbc/connector/JdbcSinkConnector.java | 56 ++++++++++++---------- .../connect/jdbc/connector/JdbcSourceTask.java | 9 ++-- .../apache/rocketmq/connect/jdbc/sink/Updater.java | 22 +++++---- .../rocketmq/connect/jdbc/source/Querier.java | 7 ++- .../jdbc/source/TimestampIncrementingQuerier.java | 6 +-- .../connect/jdbc/strategy/DivideTaskByQueue.java | 3 +- .../connect/jdbc/strategy/DivideTaskByTopic.java | 14 ++++-- 14 files changed, 107 insertions(+), 90 deletions(-) diff --git a/pom.xml b/pom.xml index 9df23c4..61680f1 100644 --- a/pom.xml +++ b/pom.xml @@ -190,6 +190,7 @@ <groupId>io.openmessaging</groupId> <artifactId>openmessaging-connector</artifactId> <version>0.1.1</version> + <scope>provided</scope> </dependency> <dependency> <groupId>io.openmessaging</groupId> @@ -264,6 +265,7 @@ <artifactId>druid</artifactId> <version>1.0.31</version> </dependency> + </dependencies> </project> diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/common/CloneUtils.java b/src/main/java/org/apache/rocketmq/connect/jdbc/common/CloneUtils.java new file mode 100644 index 0000000..f0ff98e --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/common/CloneUtils.java @@ -0,0 +1,28 @@ +package org.apache.rocketmq.connect.jdbc.common; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +public class CloneUtils { + @SuppressWarnings("unchecked") + public static <T extends Serializable> T clone(T obj) { + T clonedObj = null; + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(obj); + oos.close(); + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + ObjectInputStream ois = new ObjectInputStream(bais); + clonedObj = (T) ois.readObject(); + ois.close(); + } catch (Exception e) { + e.printStackTrace(); + } + return clonedObj; + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/common/ConstDefine.java b/src/main/java/org/apache/rocketmq/connect/jdbc/common/ConstDefine.java index f49d367..e6d2f7a 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/common/ConstDefine.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/common/ConstDefine.java @@ -19,5 +19,5 @@ package org.apache.rocketmq.connect.jdbc.common; public class ConstDefine { public static String JDBC_CONNECTOR_ADMIN_PREFIX = "JDBC-CONNECTOR-ADMIN"; - + public static final String PREFIX = "jdbc"; } diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/common/DBUtils.java b/src/main/java/org/apache/rocketmq/connect/jdbc/common/DBUtils.java index ab58153..31a86d1 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/common/DBUtils.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/common/DBUtils.java @@ -196,7 +196,7 @@ public class DBUtils { map.put("username", config.getDbUsername()); map.put("password", config.getDbPassword()); map.put("initialSize", "1"); - map.put("maxActive", "1"); + map.put("maxActive", "2"); map.put("maxWait", "60000"); map.put("timeBetweenEvictionRunsMillis", "60000"); map.put("minEvictableIdleTimeMillis", "300000"); diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java index cca1aa5..9162bad 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java @@ -78,9 +78,6 @@ public class Config { private long timestampDelayInterval = 0; private String dbTimezone = "GMT+8"; private String queueName; - private String jdbcUrl; - private String jdbcUsername; - private String jdbcPassword; private Logger log = LoggerFactory.getLogger(Config.class); public static final Set<String> REQUEST_CONFIG = new HashSet<String>() { @@ -327,32 +324,4 @@ public class Config { public void setWhiteTable(String whiteTable) { this.whiteTable = whiteTable; } - - public void setPollInterval(long pollInterval) { - this.pollInterval = pollInterval; - } - - public String getJdbcUrl() { - return jdbcUrl; - } - - public void setJdbcUrl(String jdbcUrl) { - this.jdbcUrl = jdbcUrl; - } - - public String getJdbcUsername() { - return jdbcUsername; - } - - public void setJdbcUsername(String jdbcUsername) { - this.jdbcUsername = jdbcUsername; - } - - public String getJdbcPassword() { - return jdbcPassword; - } - - public void setJdbcPassword(String jdbcPassword) { - this.jdbcPassword = jdbcPassword; - } } \ No newline at end of file diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java index 3ff4f71..26b1541 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java @@ -16,16 +16,16 @@ public class SinkDbConnectorConfig extends DbConnectorConfig{ private String srcNamesrvs; private String srcCluster; private long refreshInterval; - private Map<String, List<TaskTopicInfo>> topicRouteMap; + private Map<String, Set<TaskTopicInfo>> topicRouteMap; public SinkDbConnectorConfig(){ } @Override public void validate(KeyValue config) { - this.taskParallelism = config.getInt(Config.CONN_TASK_PARALLELISM, 0); + this.taskParallelism = config.getInt(Config.CONN_TASK_PARALLELISM, 1); - int strategy = config.getInt(Config.CONN_TASK_DIVIDE_STRATEGY, DivideStrategyEnum.BY_QUEUE.ordinal()); + int strategy = config.getInt(Config.CONN_TASK_DIVIDE_STRATEGY, DivideStrategyEnum.BY_TOPIC.ordinal()); if (strategy == DivideStrategyEnum.BY_QUEUE.ordinal()) { this.taskDivideStrategy = new DivideTaskByQueue(); } else { @@ -43,6 +43,7 @@ public class SinkDbConnectorConfig extends DbConnectorConfig{ this.srcNamesrvs = config.getString(Config.CONN_SOURCE_RMQ); this.srcCluster = config.getString(Config.CONN_SOURCE_CLUSTER); this.refreshInterval = config.getLong(Config.REFRESH_INTERVAL, 3); + this.mode = config.getString(Config.CONN_DB_MODE, "bulk"); } @@ -81,11 +82,11 @@ public class SinkDbConnectorConfig extends DbConnectorConfig{ return this.refreshInterval; } - public Map<String, List<TaskTopicInfo>> getTopicRouteMap() { + public Map<String, Set<TaskTopicInfo>> getTopicRouteMap() { return topicRouteMap; } - public void setTopicRouteMap(Map<String, List<TaskTopicInfo>> topicRouteMap) { + public void setTopicRouteMap(Map<String, Set<TaskTopicInfo>> topicRouteMap) { this.topicRouteMap = topicRouteMap; } diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/SourceDbConnectorConfig.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/SourceDbConnectorConfig.java index 801e411..4972739 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/config/SourceDbConnectorConfig.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/SourceDbConnectorConfig.java @@ -18,9 +18,9 @@ public class SourceDbConnectorConfig extends DbConnectorConfig{ @Override public void validate(KeyValue config) { - this.taskParallelism = config.getInt(Config.CONN_TASK_PARALLELISM, 0); + this.taskParallelism = config.getInt(Config.CONN_TASK_PARALLELISM, 1); - int strategy = config.getInt(Config.CONN_TASK_DIVIDE_STRATEGY, DivideStrategyEnum.BY_QUEUE.ordinal()); + int strategy = config.getInt(Config.CONN_TASK_DIVIDE_STRATEGY, DivideStrategyEnum.BY_TOPIC.ordinal()); if (strategy == DivideStrategyEnum.BY_QUEUE.ordinal()) { this.taskDivideStrategy = new DivideTaskByQueue(); } else { diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java index 0f818ee..6a41646 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java @@ -3,46 +3,48 @@ package org.apache.rocketmq.connect.jdbc.connector; import io.openmessaging.KeyValue; import io.openmessaging.connector.api.Task; import io.openmessaging.connector.api.sink.SinkConnector; -import org.apache.commons.lang3.StringUtils; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.concurrent.BasicThreadFactory; -import org.apache.commons.lang3.text.StrSubstitutor; import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.TopicConfig; -import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.connect.jdbc.common.CloneUtils; import org.apache.rocketmq.connect.jdbc.common.ConstDefine; import org.apache.rocketmq.connect.jdbc.common.Utils; -import org.apache.rocketmq.connect.jdbc.config.*; +import org.apache.rocketmq.connect.jdbc.config.Config; +import org.apache.rocketmq.connect.jdbc.config.DataType; +import org.apache.rocketmq.connect.jdbc.config.DbConnectorConfig; +import org.apache.rocketmq.connect.jdbc.config.SinkDbConnectorConfig; +import org.apache.rocketmq.connect.jdbc.config.TaskDivideConfig; +import org.apache.rocketmq.connect.jdbc.config.TaskTopicInfo; import org.apache.rocketmq.remoting.RPCHook; -import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - public class JdbcSinkConnector extends SinkConnector { private static final Logger log = LoggerFactory.getLogger(JdbcSinkConnector.class); private DbConnectorConfig dbConnectorConfig; private volatile boolean configValid = false; private ScheduledExecutorService executor; - private Map<String, List<TaskTopicInfo>> topicRouteMap; + private HashMap<String, Set<TaskTopicInfo>> topicRouteMap; private DefaultMQAdminExt srcMQAdminExt; private volatile boolean adminStarted; public JdbcSinkConnector() { - topicRouteMap = new HashMap<String, List<TaskTopicInfo>>(); + topicRouteMap = new HashMap<>(); dbConnectorConfig = new SinkDbConnectorConfig(); executor = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("JdbcSinkConnector-SinkWatcher-%d").daemon(true).build()); } @@ -93,30 +95,34 @@ public class JdbcSinkConnector extends SinkConnector { public void startListener() { executor.scheduleAtFixedRate(new Runnable() { + boolean first = true; + HashMap<String, Set<TaskTopicInfo>> origin = null; + @Override public void run() { - Map<String, List<TaskTopicInfo>> origin = topicRouteMap; - topicRouteMap = new HashMap<String, List<TaskTopicInfo>>(); - buildRoute(); - + if (first) { + origin = CloneUtils.clone(topicRouteMap); + first = false; + } if (!compare(origin, topicRouteMap)) { context.requestTaskReconfiguration(); + origin = CloneUtils.clone(topicRouteMap); } } }, ((SinkDbConnectorConfig) dbConnectorConfig).getRefreshInterval(), ((SinkDbConnectorConfig) dbConnectorConfig).getRefreshInterval(), TimeUnit.SECONDS); } - public boolean compare(Map<String, List<TaskTopicInfo>> origin, Map<String, List<TaskTopicInfo>> updated) { + public boolean compare(Map<String, Set<TaskTopicInfo>> origin, Map<String, Set<TaskTopicInfo>> updated) { if (origin.size() != updated.size()) { return false; } - for (Map.Entry<String, List<TaskTopicInfo>> entry : origin.entrySet()) { + for (Map.Entry<String, Set<TaskTopicInfo>> entry : origin.entrySet()) { if (!updated.containsKey(entry.getKey())) { return false; } - List<TaskTopicInfo> originTasks = entry.getValue(); - List<TaskTopicInfo> updateTasks = updated.get(entry.getKey()); + Set<TaskTopicInfo> originTasks = entry.getValue(); + Set<TaskTopicInfo> updateTasks = updated.get(entry.getKey()); if (originTasks.size() != updateTasks.size()) { return false; } @@ -145,7 +151,7 @@ public class JdbcSinkConnector extends SinkConnector { TopicRouteData topicRouteData = srcMQAdminExt.examineTopicRouteInfo(topic); if (!topicRouteMap.containsKey(topic)) { - topicRouteMap.put(topic, new ArrayList<TaskTopicInfo>()); + topicRouteMap.put(topic, new HashSet<>(16)); } for (QueueData qd : topicRouteData.getQueueDatas()) { if (brokerNameSet.contains(qd.getBrokerName())) { diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java index d533395..f36623f 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java @@ -21,12 +21,15 @@ package org.apache.rocketmq.connect.jdbc.connector; import io.openmessaging.connector.api.source.SourceTask; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.sql.Connection; import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.rocketmq.connect.jdbc.common.ConstDefine; import org.apache.rocketmq.connect.jdbc.config.Config; import org.apache.rocketmq.connect.jdbc.common.DBUtils; import org.apache.rocketmq.connect.jdbc.config.ConfigUtil; @@ -105,13 +108,13 @@ public class JdbcSourceTask extends SourceTask { .entryType(EntryType.UPDATE); for (int i = 0; i < dataRow.getColList().size(); i++) { Object[] value = new Object[2]; - value[0] = value[1] = dataRow.getDataList().get(i); + value[0] = value[1] = dataRow.getParserList().get(i).getValue(dataRow.getDataList().get(i)); dataEntryBuilder.putFiled(dataRow.getColList().get(i), JSONObject.toJSONString(value)); } SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry( - ByteBuffer.wrap((config.getDbUrl() + config.getDbPort()).getBytes("UTF-8")), - ByteBuffer.wrap(jsonObject.toJSONString().getBytes("UTF-8"))); + ByteBuffer.wrap((ConstDefine.PREFIX + config.getDbUrl() + config.getDbPort()).getBytes(StandardCharsets.UTF_8)), + ByteBuffer.wrap(jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8))); res.add(sourceDataEntry); log.debug("sourceDataEntry : {}", JSONObject.toJSONString(sourceDataEntry)); } diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java b/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java index e30c65f..9feffe6 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java @@ -49,7 +49,7 @@ public class Updater { isSuccess = true; // 再查原有数据是否存在,存在则删除 beforeUpdateId = queryBeforeUpdateRowId(dbName, tableName, fieldMap); - if (beforeUpdateId != 0){ + if (beforeUpdateId != 0 && afterUpdateId != beforeUpdateId){ isSuccess = deleteRow(dbName, tableName, beforeUpdateId); } break; @@ -107,7 +107,7 @@ public class Updater { ResultSet rs; PreparedStatement stmt; Boolean finishQuery = false; - String query = "select id from " + dbName + "." + tableName + " where "; + String query = "select id from " + dbName + "." + tableName + " where 1=1"; for (Map.Entry<Field, Object[]> entry : fieldMap.entrySet()) { count ++; @@ -116,7 +116,7 @@ public class Updater { Object fieldValue = entry.getValue()[0]; if ("id".equals(fieldName)) continue; - if (count != 1) { + if (count <=fieldMap.size()) { query += " and "; } if (fieldValue == null) @@ -150,7 +150,7 @@ public class Updater { ResultSet rs; PreparedStatement stmt; Boolean finishQuery = false; - String query = "select id from " + dbName + "." + tableName + " where "; + String query = "select id from " + dbName + "." + tableName + " where 1=1"; for (Map.Entry<Field, Object[]> entry : fieldMap.entrySet()) { count ++; @@ -159,7 +159,7 @@ public class Updater { Object fieldValue = entry.getValue()[1]; if ("id".equals(fieldName)) continue; - if (count != 1) { + if (count <=fieldMap.size()) { query += " and "; } if (fieldValue == null) @@ -200,19 +200,21 @@ public class Updater { FieldType fieldType = entry.getKey().getType(); Object fieldValue = entry.getValue()[1]; if ("id".equals(fieldName)) { - if (id == 0) + if (id == 0){ + if(count==fieldMap.size()) update = update.substring(0,update.length()-1); continue; - else + }else{ fieldValue = id; - } - if (count != 1) { - update += ", "; + } } if (fieldValue == null) { update += fieldName + " = NULL"; } else { update = typeParser(fieldType, fieldName, fieldValue, update); } + if(count<fieldMap.size()){ + update += ","; + } } try { diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java index d2544f9..03447a8 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java @@ -92,12 +92,12 @@ public class Querier { public void poll() { try { PreparedStatement stmt; - StringBuilder query = new StringBuilder("select * from "); LinkedList<Table> tableLinkedList = new LinkedList<>(); for (Map.Entry<String, Database> entry : schema.getDbMap().entrySet()) { String db = entry.getKey(); Iterator<Map.Entry<String, Table>> iterator = entry.getValue().getTableMap().entrySet().iterator(); while (iterator.hasNext()) { + StringBuilder query = new StringBuilder("select * from "); Map.Entry<String, Table> tableEntry = iterator.next(); String tb = tableEntry.getKey(); query.append(db + "." + tb); @@ -116,7 +116,7 @@ public class Querier { query.append(condition); } } - stmt = connection.prepareStatement(query + db + "." + tb); + stmt = connection.prepareStatement(query.toString()); ResultSet rs; rs = stmt.executeQuery(); List<String> colList = tableEntry.getValue().getColList(); @@ -158,7 +158,7 @@ public class Querier { for (String whiteTableName : whiteTableObject.keySet()){ Collections.addAll(whiteTableSet, whiteTableName); HashMap<String, String> filterMap = new HashMap<>(); - JSONObject tableFilterObject = (JSONObject)whiteTableObject.get(whiteTableName); + JSONObject tableFilterObject = JSONObject.parseObject(whiteTableObject.get(whiteTableName).toString()); for(String filterKey : tableFilterObject.keySet()){ filterMap.put(filterKey, tableFilterObject.getString(filterKey)); } @@ -170,5 +170,4 @@ public class Querier { schema.load(); log.info("load schema success"); } - } diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingQuerier.java b/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingQuerier.java index 964322d..0ab72df 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingQuerier.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingQuerier.java @@ -288,9 +288,9 @@ public class TimestampIncrementingQuerier extends Querier { incrementingColumn = config.getIncrementingColumnName(); map.put("driverClassName", "com.mysql.cj.jdbc.Driver"); map.put("url", - "jdbc:mysql://" + config.getJdbcUrl() + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8"); - map.put("username", config.getJdbcUsername()); - map.put("password", config.getJdbcPassword()); + "jdbc:mysql://" + config.getDbUrl() + ":" + config.getDbPort() +"?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8"); + map.put("username", config.getDbUsername()); + map.put("password", config.getDbPassword()); map.put("initialSize", "2"); map.put("maxActive", "2"); map.put("maxWait", "60000"); diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java index 797710a..9d23fd2 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.connect.jdbc.strategy; import com.alibaba.fastjson.JSONObject; import io.openmessaging.KeyValue; import io.openmessaging.internal.DefaultKeyValue; +import java.util.Set; import org.apache.rocketmq.connect.jdbc.config.*; import java.util.ArrayList; @@ -41,7 +42,7 @@ public class DivideTaskByQueue extends TaskDivideStrategy { List<KeyValue> config = new ArrayList<KeyValue>(); int parallelism = tdc.getTaskParallelism(); Map<Integer, List<TaskTopicInfo>> queueTopicList = new HashMap<Integer, List<TaskTopicInfo>>(); - Map<String, List<TaskTopicInfo>> topicRouteMap = ((SinkDbConnectorConfig)dbConnectorConfig).getTopicRouteMap(); + Map<String, Set<TaskTopicInfo>> topicRouteMap = ((SinkDbConnectorConfig)dbConnectorConfig).getTopicRouteMap(); int id = -1; for (String t : topicRouteMap.keySet()) { for (TaskTopicInfo taskTopicInfo : topicRouteMap.get(t)) { diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java index 762c7a0..c1d5020 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java @@ -52,7 +52,11 @@ public class DivideTaskByTopic extends TaskDivideStrategy { String filter = entry.getValue(); Map<String, String> tableMap = new HashMap<>(); tableMap.put(tableKey, filter); - taskTopicList.get(ind).put(dbKey, tableMap); + if(!taskTopicList.get(ind).containsKey(dbKey)){ + taskTopicList.get(ind).put(dbKey, tableMap); + }else { + taskTopicList.get(ind).get(dbKey).putAll(tableMap); + } } for (int i = 0; i < parallelism; i++) { @@ -77,11 +81,13 @@ public class DivideTaskByTopic extends TaskDivideStrategy { int parallelism = tdc.getTaskParallelism(); int id = -1; Set<String> topicRouteSet = ((SinkDbConnectorConfig)dbConnectorConfig).getWhiteTopics(); - Map<Integer, String> taskTopicList = new HashMap<>(); + Map<Integer, StringBuilder> taskTopicList = new HashMap<>(); for (String topicName : topicRouteSet) { int ind = ++id % parallelism; if (!taskTopicList.containsKey(ind)) { - taskTopicList.put(ind, topicName); + taskTopicList.put(ind, new StringBuilder(topicName)); + }else { + taskTopicList.get(ind).append(",").append(topicName); } } @@ -91,7 +97,7 @@ public class DivideTaskByTopic extends TaskDivideStrategy { keyValue.put(Config.CONN_DB_PORT, tdc.getDbPort()); keyValue.put(Config.CONN_DB_USERNAME, tdc.getDbUserName()); keyValue.put(Config.CONN_DB_PASSWORD, tdc.getDbPassword()); - keyValue.put(Config.CONN_TOPIC_NAMES, JSONObject.toJSONString(taskTopicList.get(i))); + keyValue.put(Config.CONN_TOPIC_NAMES, taskTopicList.get(i).toString()); keyValue.put(Config.CONN_DATA_TYPE, tdc.getDataType()); keyValue.put(Config.CONN_SOURCE_RECORD_CONVERTER, tdc.getSrcRecordConverter()); keyValue.put(Config.CONN_DB_MODE, tdc.getMode());
