This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 252e26ae3dfc5b78600c9a1e2f67a2e98917e55b Author: surilli <[email protected]> AuthorDate: Tue Dec 31 15:40:35 2019 +0800 [ISSUE #495] jdbc-sink-connector support divide task by queue (#496) * rocketmq-connect-jdbc add JdbcSinkTask & optimize JdbcSourceTask * jdbc-connector bug-fix duplicate data pushed in table * [ISSUE #489] JDBC Connector support divide task by topic strategy * [ISSUE #489] JDBC Connector support divide task by topic strategy * [ISSUE #495] jdbc-sink-connector support divide task by queue Co-authored-by: Xiongmengfei <[email protected]> --- pom.xml | 42 +++++-- .../rocketmq/connect/jdbc/common/ConstDefine.java | 23 ++++ .../rocketmq/connect/jdbc/common/DBUtils.java | 6 +- .../apache/rocketmq/connect/jdbc/common/Utils.java | 74 +++++++++++ .../rocketmq/connect/jdbc/config/Config.java | 4 + .../connect/jdbc/config/SinkDbConnectorConfig.java | 32 ++++- .../connect/jdbc/connector/JdbcSinkConnector.java | 135 ++++++++++++++++++++- .../connect/jdbc/strategy/DivideTaskByQueue.java | 29 +++-- 8 files changed, 318 insertions(+), 27 deletions(-) diff --git a/pom.xml b/pom.xml index 1d708f3..59442c5 100644 --- a/pom.xml +++ b/pom.xml @@ -16,7 +16,7 @@ <modelVersion>4.0.0</modelVersion> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-connect-jdbc</artifactId> - <version>1.0.0</version> + <version>0.0.1-SNAPSHOT</version> <name>connect-jdbc</name> <url>https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-connect-jdbc</url> @@ -40,6 +40,7 @@ <!-- Compiler settings properties --> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> + <rocketmq.version>4.5.2</rocketmq.version> </properties> <build> @@ -188,7 +189,7 @@ <dependency> <groupId>io.openmessaging</groupId> <artifactId>openmessaging-connector</artifactId> - <version>0.1.0-beta</version> + <version>0.1.1-beta-SNAPSHOT</version> </dependency> <dependency> <groupId>io.openmessaging</groupId> @@ -198,7 +199,7 @@ <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> - <version>1.2.51</version> + <version>1.2.60</version> </dependency> <dependency> <groupId>org.slf4j</groupId> @@ -217,14 +218,25 @@ </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-client</artifactId> + <version>${rocketmq.version}</version> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-tools</artifactId> + <version>${rocketmq.version}</version> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-remoting</artifactId> + <version>${rocketmq.version}</version> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-openmessaging</artifactId> <version>4.3.2</version> </dependency> - <dependency> - <groupId>com.alibaba</groupId> - <artifactId>druid</artifactId> - <version>1.0.18</version> - </dependency> + <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> @@ -239,7 +251,19 @@ <groupId>io.javalin</groupId> <artifactId>javalin</artifactId> <version>1.3.0</version> - </dependency> + </dependency> + + <dependency> + <groupId>com.github.shyiko</groupId> + <artifactId>mysql-binlog-connector-java</artifactId> + <version>0.20.1</version> + </dependency> + + <dependency> + <groupId>com.alibaba</groupId> + <artifactId>druid</artifactId> + <version>1.0.31</version> + </dependency> </dependencies> diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/common/ConstDefine.java b/src/main/java/org/apache/rocketmq/connect/jdbc/common/ConstDefine.java new file mode 100644 index 0000000..f49d367 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/common/ConstDefine.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.connect.jdbc.common; + +public class ConstDefine { + + public static String JDBC_CONNECTOR_ADMIN_PREFIX = "JDBC-CONNECTOR-ADMIN"; + +} diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/common/DBUtils.java b/src/main/java/org/apache/rocketmq/connect/jdbc/common/DBUtils.java index ccee96b..ab58153 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/common/DBUtils.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/common/DBUtils.java @@ -192,9 +192,9 @@ public class DBUtils { Map<String, String> map = new HashMap<>(); map.put("driverClassName", "com.mysql.cj.jdbc.Driver"); map.put("url", - "jdbc:mysql://" + config.getJdbcUrl() + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8&characterEncoding=utf8"); - map.put("username", config.getJdbcUsername()); - map.put("password", config.getJdbcPassword()); + "jdbc:mysql://" + config.getDbUrl() + ":" + config.getDbPort() + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8&characterEncoding=utf8"); + map.put("username", config.getDbUsername()); + map.put("password", config.getDbPassword()); map.put("initialSize", "1"); map.put("maxActive", "1"); map.put("maxWait", "60000"); diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/common/Utils.java b/src/main/java/org/apache/rocketmq/connect/jdbc/common/Utils.java new file mode 100644 index 0000000..5708e34 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/common/Utils.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.connect.jdbc.common; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class Utils { + private static final Logger log = LoggerFactory.getLogger(Utils.class); + + public static String createGroupName(String prefix) { + return new StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).toString(); + } + + public static String createGroupName(String prefix, String postfix) { + return new StringBuilder().append(prefix).append("-").append(postfix).toString(); + } + + public static String createTaskId(String prefix) { + return new StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).toString(); + } + + public static String createInstanceName(String namesrvAddr) { + String[] namesrvArray = namesrvAddr.split(";"); + List<String> namesrvList = new ArrayList<>(); + for (String ns : namesrvArray) { + if (!namesrvList.contains(ns)) { + namesrvList.add(ns); + } + } + Collections.sort(namesrvList); + return String.valueOf(namesrvList.toString().hashCode()); + } + + public static List<BrokerData> examineBrokerData(DefaultMQAdminExt defaultMQAdminExt, String topic, + String cluster) throws RemotingException, MQClientException, InterruptedException { + List<BrokerData> brokerList = new ArrayList<>(); + + TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic); + if (topicRouteData.getBrokerDatas() != null) { + for (BrokerData broker : topicRouteData.getBrokerDatas()) { + if (StringUtils.equals(broker.getCluster(), cluster)) { + brokerList.add(broker); + } + } + } + return brokerList; + } + +} diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java index 91a3e51..ae86d3f 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java @@ -59,6 +59,10 @@ public class Config { public static final String CONN_TOPIC_NAMES = "topicNames"; public static final String CONN_DB_MODE = "mode"; + public static final String CONN_SOURCE_RMQ = "source-rocketmq"; + public static final String CONN_SOURCE_CLUSTER = "source-cluster"; + public static final String REFRESH_INTERVAL = "refresh.interval"; + /* Mode Config */ private String mode = ""; private String incrementingColumnName = ""; diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java index 851b253..3ff4f71 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java @@ -6,11 +6,17 @@ import org.apache.rocketmq.connect.jdbc.strategy.DivideTaskByQueue; import org.apache.rocketmq.connect.jdbc.strategy.DivideTaskByTopic; import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Set; public class SinkDbConnectorConfig extends DbConnectorConfig{ private Set<String> whiteList; + private String srcNamesrvs; + private String srcCluster; + private long refreshInterval; + private Map<String, List<TaskTopicInfo>> topicRouteMap; public SinkDbConnectorConfig(){ } @@ -34,11 +40,15 @@ public class SinkDbConnectorConfig extends DbConnectorConfig{ this.dbUserName = config.getString(Config.CONN_DB_USERNAME); this.dbPassword = config.getString(Config.CONN_DB_PASSWORD); + this.srcNamesrvs = config.getString(Config.CONN_SOURCE_RMQ); + this.srcCluster = config.getString(Config.CONN_SOURCE_CLUSTER); + this.refreshInterval = config.getLong(Config.REFRESH_INTERVAL, 3); + } private void buildWhiteList(KeyValue config) { this.whiteList = new HashSet<>(); - String whiteListStr = config.getString(Config.CONN_WHITE_LIST, ""); + String whiteListStr = config.getString(Config.CONN_TOPIC_NAMES, ""); String[] wl = whiteListStr.trim().split(","); if (wl.length <= 0) throw new IllegalArgumentException("White list must be not empty."); @@ -59,6 +69,26 @@ public class SinkDbConnectorConfig extends DbConnectorConfig{ this.whiteList = whiteList; } + public String getSrcNamesrvs() { + return this.srcNamesrvs; + } + + public String getSrcCluster() { + return this.srcCluster; + } + + public long getRefreshInterval() { + return this.refreshInterval; + } + + public Map<String, List<TaskTopicInfo>> getTopicRouteMap() { + return topicRouteMap; + } + + public void setTopicRouteMap(Map<String, List<TaskTopicInfo>> topicRouteMap) { + this.topicRouteMap = topicRouteMap; + } + @Override public Set<String> getWhiteTopics() { return getWhiteList(); diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java index 935ad52..0f818ee 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java @@ -3,22 +3,69 @@ package org.apache.rocketmq.connect.jdbc.connector; import io.openmessaging.KeyValue; import io.openmessaging.connector.api.Task; import io.openmessaging.connector.api.sink.SinkConnector; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.apache.commons.lang3.text.StrSubstitutor; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.protocol.body.TopicList; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.QueueData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.connect.jdbc.common.ConstDefine; +import org.apache.rocketmq.connect.jdbc.common.Utils; import org.apache.rocketmq.connect.jdbc.config.*; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; +import java.util.*; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; public class JdbcSinkConnector extends SinkConnector { private static final Logger log = LoggerFactory.getLogger(JdbcSinkConnector.class); - private KeyValue config; private DbConnectorConfig dbConnectorConfig; private volatile boolean configValid = false; + private ScheduledExecutorService executor; + private Map<String, List<TaskTopicInfo>> topicRouteMap; - public JdbcSinkConnector(){ + private DefaultMQAdminExt srcMQAdminExt; + + private volatile boolean adminStarted; + + public JdbcSinkConnector() { + topicRouteMap = new HashMap<String, List<TaskTopicInfo>>(); dbConnectorConfig = new SinkDbConnectorConfig(); + executor = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("JdbcSinkConnector-SinkWatcher-%d").daemon(true).build()); + } + + private synchronized void startMQAdminTools() { + if (!configValid || adminStarted) { + return; + } + RPCHook rpcHook = null; + this.srcMQAdminExt = new DefaultMQAdminExt(rpcHook); + this.srcMQAdminExt.setNamesrvAddr(((SinkDbConnectorConfig) this.dbConnectorConfig).getSrcNamesrvs()); + this.srcMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.JDBC_CONNECTOR_ADMIN_PREFIX)); + this.srcMQAdminExt.setInstanceName(Utils.createInstanceName(((SinkDbConnectorConfig) this.dbConnectorConfig).getSrcNamesrvs())); + + try { + this.srcMQAdminExt.start(); + log.info("RocketMQ srcMQAdminExt started"); + + } catch (MQClientException e) { + log.error("Replicator start failed for `srcMQAdminExt` exception.", e); + } + + adminStarted = true; } @Override @@ -40,7 +87,80 @@ public class JdbcSinkConnector extends SinkConnector { @Override public void start() { + startMQAdminTools(); + startListener(); + } + + public void startListener() { + executor.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + Map<String, List<TaskTopicInfo>> origin = topicRouteMap; + topicRouteMap = new HashMap<String, List<TaskTopicInfo>>(); + + buildRoute(); + + if (!compare(origin, topicRouteMap)) { + context.requestTaskReconfiguration(); + } + } + }, ((SinkDbConnectorConfig) dbConnectorConfig).getRefreshInterval(), ((SinkDbConnectorConfig) dbConnectorConfig).getRefreshInterval(), TimeUnit.SECONDS); + } + public boolean compare(Map<String, List<TaskTopicInfo>> origin, Map<String, List<TaskTopicInfo>> updated) { + if (origin.size() != updated.size()) { + return false; + } + for (Map.Entry<String, List<TaskTopicInfo>> entry : origin.entrySet()) { + if (!updated.containsKey(entry.getKey())) { + return false; + } + List<TaskTopicInfo> originTasks = entry.getValue(); + List<TaskTopicInfo> updateTasks = updated.get(entry.getKey()); + if (originTasks.size() != updateTasks.size()) { + return false; + } + + if (!originTasks.containsAll(updateTasks)) { + return false; + } + } + + return true; + } + + public void buildRoute() { + String srcCluster = ((SinkDbConnectorConfig) this.dbConnectorConfig).getSrcCluster(); + try { + for (String topic : ((SinkDbConnectorConfig) this.dbConnectorConfig).getWhiteList()) { + + // different from BrokerData with cluster field, which can ensure the brokerData is from expected cluster. + // QueueData use brokerName as unique info on cluster of rocketmq. so when we want to get QueueData of + // expected cluster, we should get brokerNames of expected cluster, and then filter queueDatas. + List<BrokerData> brokerList = Utils.examineBrokerData(this.srcMQAdminExt, topic, srcCluster); + Set<String> brokerNameSet = new HashSet<String>(); + for (BrokerData b : brokerList) { + brokerNameSet.add(b.getBrokerName()); + } + + TopicRouteData topicRouteData = srcMQAdminExt.examineTopicRouteInfo(topic); + if (!topicRouteMap.containsKey(topic)) { + topicRouteMap.put(topic, new ArrayList<TaskTopicInfo>()); + } + for (QueueData qd : topicRouteData.getQueueDatas()) { + if (brokerNameSet.contains(qd.getBrokerName())) { + for (int i = 0; i < qd.getReadQueueNums(); i++) { + TaskTopicInfo taskTopicInfo = new TaskTopicInfo(topic, qd.getBrokerName(), i, null); + topicRouteMap.get(topic).add(taskTopicInfo); + } + } + } + } + } catch (Exception e) { + log.error("Fetch topic list error.", e); + } finally { + srcMQAdminExt.shutdown(); + } } @Override @@ -70,6 +190,10 @@ public class JdbcSinkConnector extends SinkConnector { return new ArrayList<KeyValue>(); } + startMQAdminTools(); + + buildRoute(); + TaskDivideConfig tdc = new TaskDivideConfig( this.dbConnectorConfig.getDbUrl(), this.dbConnectorConfig.getDbPort(), @@ -80,6 +204,9 @@ public class JdbcSinkConnector extends SinkConnector { this.dbConnectorConfig.getTaskParallelism(), this.dbConnectorConfig.getMode() ); + + ((SinkDbConnectorConfig) this.dbConnectorConfig).setTopicRouteMap(topicRouteMap); + return this.dbConnectorConfig.getTaskDivideStrategy().divide(this.dbConnectorConfig, tdc); } } diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java index 7ef5c31..797710a 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java @@ -19,10 +19,7 @@ package org.apache.rocketmq.connect.jdbc.strategy; import com.alibaba.fastjson.JSONObject; import io.openmessaging.KeyValue; import io.openmessaging.internal.DefaultKeyValue; -import org.apache.rocketmq.replicator.config.DataType; -import org.apache.rocketmq.replicator.config.TaskConfigEnum; -import org.apache.rocketmq.replicator.config.TaskDivideConfig; -import org.apache.rocketmq.replicator.config.TaskTopicInfo; +import org.apache.rocketmq.connect.jdbc.config.*; import java.util.ArrayList; import java.util.HashMap; @@ -30,12 +27,21 @@ import java.util.List; import java.util.Map; public class DivideTaskByQueue extends TaskDivideStrategy { + @Override - public List<KeyValue> divide(Map<String, List<TaskTopicInfo>> topicRouteMap, TaskDivideConfig tdc) { + public List<KeyValue> divide(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc) { + if (dbConnectorConfig instanceof SinkDbConnectorConfig){ + return divideSinkTaskByQueue(dbConnectorConfig, tdc); + } + return null; + } + + public List<KeyValue> divideSinkTaskByQueue(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc) { List<KeyValue> config = new ArrayList<KeyValue>(); int parallelism = tdc.getTaskParallelism(); Map<Integer, List<TaskTopicInfo>> queueTopicList = new HashMap<Integer, List<TaskTopicInfo>>(); + Map<String, List<TaskTopicInfo>> topicRouteMap = ((SinkDbConnectorConfig)dbConnectorConfig).getTopicRouteMap(); int id = -1; for (String t : topicRouteMap.keySet()) { for (TaskTopicInfo taskTopicInfo : topicRouteMap.get(t)) { @@ -49,11 +55,14 @@ public class DivideTaskByQueue extends TaskDivideStrategy { for (int i = 0; i < parallelism; i++) { KeyValue keyValue = new DefaultKeyValue(); - keyValue.put(TaskConfigEnum.TASK_STORE_ROCKETMQ.getKey(), tdc.getStoreTopic()); - keyValue.put(TaskConfigEnum.TASK_SOURCE_ROCKETMQ.getKey(), tdc.getSourceNamesrvAddr()); - keyValue.put(TaskConfigEnum.TASK_DATA_TYPE.getKey(), DataType.COMMON_MESSAGE.ordinal()); - keyValue.put(TaskConfigEnum.TASK_TOPIC_INFO.getKey(), JSONObject.toJSONString(queueTopicList.get(i))); - keyValue.put(TaskConfigEnum.TASK_SOURCE_RECORD_CONVERTER.getKey(), tdc.getSrcRecordConverter()); + keyValue.put(Config.CONN_DB_IP, tdc.getDbUrl()); + keyValue.put(Config.CONN_DB_PORT, tdc.getDbPort()); + keyValue.put(Config.CONN_DB_USERNAME, tdc.getDbUserName()); + keyValue.put(Config.CONN_DB_PASSWORD, tdc.getDbPassword()); + keyValue.put(Config.CONN_TOPIC_NAMES, JSONObject.toJSONString(queueTopicList.get(i))); + keyValue.put(Config.CONN_DATA_TYPE, tdc.getDataType()); + keyValue.put(Config.CONN_SOURCE_RECORD_CONVERTER, tdc.getSrcRecordConverter()); + keyValue.put(Config.CONN_DB_MODE, tdc.getMode()); config.add(keyValue); }
