This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 341d6f356047b0fe908a2aad1ca901cdd7317aef Author: Xiongmengfei <[email protected]> AuthorDate: Tue Dec 31 15:21:39 2019 +0800 [ISSUE #489] JDBC Connector support divide task by topic strategy (#490) * [ISSUE #489] JDBC Connector support divide task by topic strategy * [ISSUE #489] JDBC Connector support divide task by topic strategy --- .../rocketmq/connect/jdbc/config/Config.java | 70 +++++++++---- .../rocketmq/connect/jdbc/config/DataType.java | 26 +++++ .../connect/jdbc/config/DbConnectorConfig.java | 84 ++++++++++++++++ .../connect/jdbc/config/SinkDbConnectorConfig.java | 67 ++++++++++++ .../jdbc/config/SourceDbConnectorConfig.java | 73 ++++++++++++++ .../connect/jdbc/config/TaskDivideConfig.java | 112 +++++++++++++++++++++ .../connect/jdbc/config/TaskTopicInfo.java | 37 +++++++ .../connect/jdbc/connector/JdbcSinkConnector.java | 39 +++++-- .../connect/jdbc/connector/JdbcSinkTask.java | 1 + .../jdbc/connector/JdbcSourceConnector.java | 37 +++++-- .../connect/jdbc/connector/JdbcSourceTask.java | 13 +-- .../connect/jdbc/strategy/DivideStrategyEnum.java | 23 +++++ .../jdbc/strategy/DivideTaskByConsistentHash.java | 82 +++++++++++++++ .../connect/jdbc/strategy/DivideTaskByQueue.java | 62 ++++++++++++ .../connect/jdbc/strategy/DivideTaskByTopic.java | 104 +++++++++++++++++++ .../connect/jdbc/strategy/TaskDivideStrategy.java | 32 ++++++ 16 files changed, 822 insertions(+), 40 deletions(-) diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java index 5491d43..91a3e51 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.connect.jdbc.config; +import com.alibaba.fastjson.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,9 +29,11 @@ public class Config { private static final Logger LOG = LoggerFactory.getLogger(Config.class); /* Database Connection Config */ - private String jdbcUrl; - private String jdbcUsername; - private String jdbcPassword; + private String dbUrl; + private String dbPort; + private String dbUsername; + private String dbPassword; + private String dataType; private String rocketmqTopic; private String jdbcBackoff; private String jdbcAttempts; @@ -44,6 +47,18 @@ public class Config { private String whiteDataBase; private String whiteTable; + public static final String CONN_TASK_PARALLELISM = "task-parallelism"; + public static final String CONN_TASK_DIVIDE_STRATEGY = "task-divide-strategy"; + public static final String CONN_WHITE_LIST = "whiteDataBase"; + public static final String CONN_SOURCE_RECORD_CONVERTER = "source-record-converter"; + public static final String CONN_DB_IP = "dbUrl"; + public static final String CONN_DB_PORT = "dbPort"; + public static final String CONN_DB_USERNAME = "dbUsername"; + public static final String CONN_DB_PASSWORD = "dbPassword"; + public static final String CONN_DATA_TYPE = "dataType"; + public static final String CONN_TOPIC_NAMES = "topicNames"; + public static final String CONN_DB_MODE = "mode"; + /* Mode Config */ private String mode = ""; private String incrementingColumnName = ""; @@ -57,15 +72,16 @@ public class Config { private int batchMaxRows = 100; private long tablePollInterval = 60000; private long timestampDelayInterval = 0; - private String dbTimezone = "UTC"; + private String dbTimezone = "GMT+8"; private String queueName; private Logger log = LoggerFactory.getLogger(Config.class); public static final Set<String> REQUEST_CONFIG = new HashSet<String>() { { - add("jdbcUrl"); - add("jdbcUsername"); - add("jdbcPassword"); + add("dbUrl"); + add("dbPort"); + add("dbUsername"); + add("dbPassword"); add("mode"); add("rocketmqTopic"); } @@ -79,28 +95,44 @@ public class Config { this.queueName = queueName; } - public String getJdbcUrl() { - return jdbcUrl; + public String getDbUrl() { + return dbUrl; + } + + public void setDbUrl(String dbUrl) { + this.dbUrl = dbUrl; + } + + public String getDbPort() { + return dbPort; + } + + public void setDbPort(String dbPort) { + this.dbPort = dbPort; + } + + public String getDbUsername() { + return dbUsername; } - public void setJdbcUrl(String jdbcUrl) { - this.jdbcUrl = jdbcUrl; + public void setDbUsername(String dbUsername) { + this.dbUsername = dbUsername; } - public String getJdbcUsername() { - return jdbcUsername; + public String getDbPassword() { + return dbPassword; } - public void setJdbcUsername(String jdbcUsername) { - this.jdbcUsername = jdbcUsername; + public void setDbPassword(String dbPassword) { + this.dbPassword = dbPassword; } - public String getJdbcPassword() { - return jdbcPassword; + public String getDataType() { + return dataType; } - public void setJdbcPassword(String jdbcPassword) { - this.jdbcPassword = jdbcPassword; + public void setDataType(String dataType) { + this.dataType = dataType; } public String getRocketmqTopic() { diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/DataType.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/DataType.java new file mode 100644 index 0000000..ef7408a --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/DataType.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.connect.jdbc.config; + +public enum DataType { + + COMMON_MESSAGE, + TOPIC_CONFIG, + BROKER_CONFIG, + SUB_CONFIG, + OFFSET +} diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/DbConnectorConfig.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/DbConnectorConfig.java new file mode 100644 index 0000000..43bd165 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/DbConnectorConfig.java @@ -0,0 +1,84 @@ +package org.apache.rocketmq.connect.jdbc.config; + +import io.openmessaging.KeyValue; +import org.apache.rocketmq.connect.jdbc.strategy.TaskDivideStrategy; + +public abstract class DbConnectorConfig { + + public TaskDivideStrategy taskDivideStrategy; + public String dbUrl; + public String dbPort; + public String dbUserName; + public String dbPassword; + public String converter; + public int taskParallelism; + public String mode; + + public abstract void validate(KeyValue config); + + public abstract <T> T getWhiteTopics(); + + public TaskDivideStrategy getTaskDivideStrategy() { + return taskDivideStrategy; + } + + public void setTaskDivideStrategy(TaskDivideStrategy taskDivideStrategy) { + this.taskDivideStrategy = taskDivideStrategy; + } + + public String getDbUrl() { + return dbUrl; + } + + public void setDbUrl(String dbUrl) { + this.dbUrl = dbUrl; + } + + public String getDbPort() { + return dbPort; + } + + public void setDbPort(String dbPort) { + this.dbPort = dbPort; + } + + public String getDbUserName() { + return dbUserName; + } + + public void setDbUserName(String dbUserName) { + this.dbUserName = dbUserName; + } + + public String getDbPassword() { + return dbPassword; + } + + public void setDbPassword(String dbPassword) { + this.dbPassword = dbPassword; + } + + public String getConverter() { + return converter; + } + + public void setConverter(String converter) { + this.converter = converter; + } + + public int getTaskParallelism() { + return taskParallelism; + } + + public void setTaskParallelism(int taskParallelism) { + this.taskParallelism = taskParallelism; + } + + public String getMode() { + return mode; + } + + public void setMode(String mode) { + this.mode = mode; + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java new file mode 100644 index 0000000..851b253 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java @@ -0,0 +1,67 @@ +package org.apache.rocketmq.connect.jdbc.config; + +import io.openmessaging.KeyValue; +import org.apache.rocketmq.connect.jdbc.strategy.DivideStrategyEnum; +import org.apache.rocketmq.connect.jdbc.strategy.DivideTaskByQueue; +import org.apache.rocketmq.connect.jdbc.strategy.DivideTaskByTopic; + +import java.util.HashSet; +import java.util.Set; + +public class SinkDbConnectorConfig extends DbConnectorConfig{ + + private Set<String> whiteList; + + public SinkDbConnectorConfig(){ + } + + @Override + public void validate(KeyValue config) { + this.taskParallelism = config.getInt(Config.CONN_TASK_PARALLELISM, 0); + + int strategy = config.getInt(Config.CONN_TASK_DIVIDE_STRATEGY, DivideStrategyEnum.BY_QUEUE.ordinal()); + if (strategy == DivideStrategyEnum.BY_QUEUE.ordinal()) { + this.taskDivideStrategy = new DivideTaskByQueue(); + } else { + this.taskDivideStrategy = new DivideTaskByTopic(); + } + + buildWhiteList(config); + + this.converter = config.getString(Config.CONN_SOURCE_RECORD_CONVERTER); + this.dbUrl = config.getString(Config.CONN_DB_IP); + this.dbPort = config.getString(Config.CONN_DB_PORT); + this.dbUserName = config.getString(Config.CONN_DB_USERNAME); + this.dbPassword = config.getString(Config.CONN_DB_PASSWORD); + + } + + private void buildWhiteList(KeyValue config) { + this.whiteList = new HashSet<>(); + String whiteListStr = config.getString(Config.CONN_WHITE_LIST, ""); + String[] wl = whiteListStr.trim().split(","); + if (wl.length <= 0) + throw new IllegalArgumentException("White list must be not empty."); + else { + this.whiteList.clear(); + for (String t : wl) { + this.whiteList.add(t.trim()); + } + } + } + + + public Set<String> getWhiteList() { + return whiteList; + } + + public void setWhiteList(Set<String> whiteList) { + this.whiteList = whiteList; + } + + @Override + public Set<String> getWhiteTopics() { + return getWhiteList(); + } + +} diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/SourceDbConnectorConfig.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/SourceDbConnectorConfig.java new file mode 100644 index 0000000..801e411 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/SourceDbConnectorConfig.java @@ -0,0 +1,73 @@ +package org.apache.rocketmq.connect.jdbc.config; + +import com.alibaba.fastjson.JSONObject; +import io.openmessaging.KeyValue; +import org.apache.rocketmq.connect.jdbc.strategy.DivideStrategyEnum; +import org.apache.rocketmq.connect.jdbc.strategy.DivideTaskByQueue; +import org.apache.rocketmq.connect.jdbc.strategy.DivideTaskByTopic; + +import java.util.HashMap; +import java.util.Map; + +public class SourceDbConnectorConfig extends DbConnectorConfig{ + + private Map<String, String> whiteMap; + + public SourceDbConnectorConfig(){ + } + + @Override + public void validate(KeyValue config) { + this.taskParallelism = config.getInt(Config.CONN_TASK_PARALLELISM, 0); + + int strategy = config.getInt(Config.CONN_TASK_DIVIDE_STRATEGY, DivideStrategyEnum.BY_QUEUE.ordinal()); + if (strategy == DivideStrategyEnum.BY_QUEUE.ordinal()) { + this.taskDivideStrategy = new DivideTaskByQueue(); + } else { + this.taskDivideStrategy = new DivideTaskByTopic(); + } + + buildWhiteMap(config); + + this.converter = config.getString(Config.CONN_SOURCE_RECORD_CONVERTER); + this.dbUrl = config.getString(Config.CONN_DB_IP); + this.dbPort = config.getString(Config.CONN_DB_PORT); + this.dbUserName = config.getString(Config.CONN_DB_USERNAME); + this.dbPassword = config.getString(Config.CONN_DB_PASSWORD); + this.mode = config.getString(Config.CONN_DB_MODE, "bulk"); + + } + + private void buildWhiteMap(KeyValue config) { + this.whiteMap = new HashMap<>(16); + String whiteListStr = config.getString(Config.CONN_WHITE_LIST, ""); + JSONObject whiteDataBaseObject = JSONObject.parseObject(whiteListStr); + if(whiteDataBaseObject.keySet().size() <= 0){ + throw new IllegalArgumentException("white data base must be not empty."); + }else { + this.whiteMap.clear(); + for (String dbName : whiteDataBaseObject.keySet()){ + JSONObject whiteTableObject = (JSONObject) whiteDataBaseObject.get(dbName); + for (String tableName : whiteTableObject.keySet()){ + String dbTableKey = dbName + "-" + tableName; + this.whiteMap.put(dbTableKey, whiteTableObject.getString(tableName)); + } + } + } + } + + + public Map<String, String> getWhiteMap() { + return whiteMap; + } + + public void setWhiteMap(Map<String, String> whiteMap) { + this.whiteMap = whiteMap; + } + + @Override + public Map<String, String> getWhiteTopics() { + return getWhiteMap(); + } + +} diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/TaskDivideConfig.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/TaskDivideConfig.java new file mode 100644 index 0000000..8b15a2f --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/TaskDivideConfig.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.connect.jdbc.config; + +public class TaskDivideConfig { + + private String dbUrl; + + private String dbPort; + + private String dbUserName; + + private String dbPassword; + + private String srcRecordConverter; + + private int dataType; + + private int taskParallelism; + + private String mode; + + public TaskDivideConfig(String dbUrl, String dbPort, String dbUserName, String dbPassword, String srcRecordConverter, + int dataType, int taskParallelism, String mode) { + this.dbUrl = dbUrl; + this.dbPort = dbPort; + this.dbUserName = dbUserName; + this.dbPassword = dbPassword; + this.srcRecordConverter = srcRecordConverter; + this.dataType = dataType; + this.taskParallelism = taskParallelism; + this.mode = mode; + } + + public String getDbUrl() { + return dbUrl; + } + + public void setDbUrl(String dbUrl) { + this.dbUrl = dbUrl; + } + + public String getDbPort() { + return dbPort; + } + + public void setDbPort(String dbPort) { + this.dbPort = dbPort; + } + + public String getDbUserName() { + return dbUserName; + } + + public void setDbUserName(String dbUserName) { + this.dbUserName = dbUserName; + } + + public String getDbPassword() { + return dbPassword; + } + + public void setDbPassword(String dbPassword) { + this.dbPassword = dbPassword; + } + + public String getSrcRecordConverter() { + return srcRecordConverter; + } + + public void setSrcRecordConverter(String srcRecordConverter) { + this.srcRecordConverter = srcRecordConverter; + } + + public int getDataType() { + return dataType; + } + + public void setDataType(int dataType) { + this.dataType = dataType; + } + + public int getTaskParallelism() { + return taskParallelism; + } + + public void setTaskParallelism(int taskParallelism) { + this.taskParallelism = taskParallelism; + } + + public String getMode() { + return mode; + } + + public void setMode(String mode) { + this.mode = mode; + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/TaskTopicInfo.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/TaskTopicInfo.java new file mode 100644 index 0000000..5c2a21e --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/TaskTopicInfo.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.connect.jdbc.config; + +import org.apache.rocketmq.common.message.MessageQueue; + +public class TaskTopicInfo extends MessageQueue { + + private String targetTopic; + + public TaskTopicInfo(String sourceTopic, String brokerName, int queueId, String targetTopic) { + super(sourceTopic, brokerName, queueId); + this.targetTopic = targetTopic; + } + + public String getTargetTopic() { + return this.targetTopic; + } + + public void setTargetTopic(String targetTopic) { + this.targetTopic = targetTopic; + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java index e1d1256..935ad52 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java @@ -3,15 +3,23 @@ package org.apache.rocketmq.connect.jdbc.connector; import io.openmessaging.KeyValue; import io.openmessaging.connector.api.Task; import io.openmessaging.connector.api.sink.SinkConnector; -import org.apache.rocketmq.connect.jdbc.config.Config; +import org.apache.rocketmq.connect.jdbc.config.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; public class JdbcSinkConnector extends SinkConnector { - + private static final Logger log = LoggerFactory.getLogger(JdbcSinkConnector.class); private KeyValue config; + private DbConnectorConfig dbConnectorConfig; + private volatile boolean configValid = false; + + public JdbcSinkConnector(){ + dbConnectorConfig = new SinkDbConnectorConfig(); + } @Override public String verifyAndSetConfig(KeyValue config) { @@ -20,7 +28,13 @@ public class JdbcSinkConnector extends SinkConnector { return "Request config key: " + requestKey; } } - this.config = config; + try { + this.dbConnectorConfig.validate(config); + } catch (IllegalArgumentException e) { + return e.getMessage(); + } + this.configValid = true; + return ""; } @@ -51,8 +65,21 @@ public class JdbcSinkConnector extends SinkConnector { @Override public List<KeyValue> taskConfigs() { - List<KeyValue> config = new ArrayList<>(); - config.add(this.config); - return config; + log.info("List.start"); + if (!configValid) { + return new ArrayList<KeyValue>(); + } + + TaskDivideConfig tdc = new TaskDivideConfig( + this.dbConnectorConfig.getDbUrl(), + this.dbConnectorConfig.getDbPort(), + this.dbConnectorConfig.getDbUserName(), + this.dbConnectorConfig.getDbPassword(), + this.dbConnectorConfig.getConverter(), + DataType.COMMON_MESSAGE.ordinal(), + this.dbConnectorConfig.getTaskParallelism(), + this.dbConnectorConfig.getMode() + ); + return this.dbConnectorConfig.getTaskDivideStrategy().divide(this.dbConnectorConfig, tdc); } } diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java index 4b55e5a..31f43e3 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java @@ -114,6 +114,7 @@ public class JdbcSinkTask extends SinkTask { try { if (connection != null){ connection.close(); + log.info("jdbc sink task connection is closed."); } } catch (Throwable e) { log.warn("sink task stop error while closing connection to {}", "jdbc", e); diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java index 796f0e6..a083e84 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java @@ -20,7 +20,7 @@ package org.apache.rocketmq.connect.jdbc.connector; import java.util.ArrayList; import java.util.List; -import org.apache.rocketmq.connect.jdbc.config.Config; +import org.apache.rocketmq.connect.jdbc.config.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,19 +30,29 @@ import io.openmessaging.connector.api.source.SourceConnector; public class JdbcSourceConnector extends SourceConnector { private static final Logger log = LoggerFactory.getLogger(JdbcSourceConnector.class); - private KeyValue config; + private DbConnectorConfig dbConnectorConfig; + private volatile boolean configValid = false; + + public JdbcSourceConnector() { + dbConnectorConfig = new SourceDbConnectorConfig(); + } @Override public String verifyAndSetConfig(KeyValue config) { - log.info("1216123 JdbcSourceConnector verifyAndSetConfig enter"); + log.info("JdbcSourceConnector verifyAndSetConfig enter"); for (String requestKey : Config.REQUEST_CONFIG) { if (!config.containsKey(requestKey)) { return "Request config key: " + requestKey; } } - this.config = config; + try { + this.dbConnectorConfig.validate(config); + } catch (IllegalArgumentException e) { + return e.getMessage(); + } + this.configValid = true; return ""; } @@ -75,8 +85,21 @@ public class JdbcSourceConnector extends SourceConnector { @Override public List<KeyValue> taskConfigs() { log.info("List.start"); - List<KeyValue> config = new ArrayList<>(); - config.add(this.config); - return config; + if (!configValid) { + return new ArrayList<KeyValue>(); + } + + TaskDivideConfig tdc = new TaskDivideConfig( + this.dbConnectorConfig.getDbUrl(), + this.dbConnectorConfig.getDbPort(), + this.dbConnectorConfig.getDbUserName(), + this.dbConnectorConfig.getDbPassword(), + this.dbConnectorConfig.getConverter(), + DataType.COMMON_MESSAGE.ordinal(), + this.dbConnectorConfig.getTaskParallelism(), + this.dbConnectorConfig.getMode() + ); + return this.dbConnectorConfig.getTaskDivideStrategy().divide(this.dbConnectorConfig, tdc); } + } diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java index 943d432..d533395 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java @@ -22,11 +22,7 @@ import io.openmessaging.connector.api.source.SourceTask; import java.nio.ByteBuffer; import java.sql.Connection; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Timer; +import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -82,7 +78,7 @@ public class JdbcSourceTask extends SourceTask { querier = tableQueue.poll(1000, TimeUnit.MILLISECONDS); else querier = tableQueue.peek(); - Timer timer = new java.util.Timer(); + Timer timer = new Timer(); try { Thread.currentThread(); Thread.sleep(1000);//毫秒 @@ -114,7 +110,7 @@ public class JdbcSourceTask extends SourceTask { } SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry( - ByteBuffer.wrap(config.getJdbcUrl().getBytes("UTF-8")), + ByteBuffer.wrap((config.getDbUrl() + config.getDbPort()).getBytes("UTF-8")), ByteBuffer.wrap(jsonObject.toJSONString().getBytes("UTF-8"))); res.add(sourceDataEntry); log.debug("sourceDataEntry : {}", JSONObject.toJSONString(sourceDataEntry)); @@ -163,8 +159,9 @@ public class JdbcSourceTask extends SourceTask { @Override public void stop() { try { - if (connection != null){ + if (connection != null) { connection.close(); + log.info("jdbc source task connection is closed."); } } catch (Throwable e) { log.warn("source task stop error while closing connection to {}", "jdbc", e); diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideStrategyEnum.java b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideStrategyEnum.java new file mode 100644 index 0000000..0afa470 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideStrategyEnum.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.connect.jdbc.strategy; + +public enum DivideStrategyEnum { + + BY_TOPIC, + BY_QUEUE +} diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByConsistentHash.java b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByConsistentHash.java new file mode 100644 index 0000000..bac7358 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByConsistentHash.java @@ -0,0 +1,82 @@ +package org.apache.rocketmq.connect.jdbc.strategy;/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import com.alibaba.fastjson.JSONObject; +import io.openmessaging.KeyValue; +import io.openmessaging.internal.DefaultKeyValue; +import org.apache.rocketmq.common.consistenthash.ConsistentHashRouter; +import org.apache.rocketmq.common.consistenthash.Node; +import org.apache.rocketmq.replicator.config.DataType; +import org.apache.rocketmq.replicator.config.TaskConfigEnum; +import org.apache.rocketmq.replicator.config.TaskDivideConfig; +import org.apache.rocketmq.replicator.config.TaskTopicInfo; + +import java.util.*; + +public class DivideTaskByConsistentHash extends TaskDivideStrategy { + @Override public List<KeyValue> divide(Map<String, List<TaskTopicInfo>> topicMap, TaskDivideConfig tdc) { + + List<KeyValue> config = new ArrayList<>(); + int parallelism = tdc.getTaskParallelism(); + Map<Integer, List<TaskTopicInfo>> queueTopicList = new HashMap<>(); + int id = -1; + + Collection<ClientNode> cidNodes = new ArrayList<>(); + for (int i = 0; i < parallelism; i++) { + cidNodes.add(new ClientNode(i, Integer.toString(i))); + queueTopicList.put(i, new ArrayList<>()); + } + + ConsistentHashRouter<ClientNode> router = new ConsistentHashRouter<>(cidNodes, cidNodes.size()); + + for (String t : topicMap.keySet()) { + for (TaskTopicInfo queue : topicMap.get(t)) { + ClientNode clientNode = router.routeNode(queue.toString()); + if (clientNode != null) { + queueTopicList.get(clientNode.index).add(queue); + } + } + } + + for (int i = 0; i < parallelism; i++) { + KeyValue keyValue = new DefaultKeyValue(); + keyValue.put(TaskConfigEnum.TASK_STORE_ROCKETMQ.getKey(), tdc.getStoreTopic()); + keyValue.put(TaskConfigEnum.TASK_SOURCE_ROCKETMQ.getKey(), tdc.getSourceNamesrvAddr()); + keyValue.put(TaskConfigEnum.TASK_DATA_TYPE.getKey(), DataType.COMMON_MESSAGE.ordinal()); + keyValue.put(TaskConfigEnum.TASK_TOPIC_INFO.getKey(), JSONObject.toJSONString(queueTopicList.get(i))); + keyValue.put(TaskConfigEnum.TASK_SOURCE_RECORD_CONVERTER.getKey(), tdc.getSrcRecordConverter()); + config.add(keyValue); + } + + return config; + } + + private static class ClientNode implements Node { + private final String clientID; + private final int index; + + public ClientNode(int index, String clientID) { + this.index = index; + this.clientID = clientID; + } + + @Override + public String getKey() { + return clientID; + } + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java new file mode 100644 index 0000000..7ef5c31 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.connect.jdbc.strategy; + +import com.alibaba.fastjson.JSONObject; +import io.openmessaging.KeyValue; +import io.openmessaging.internal.DefaultKeyValue; +import org.apache.rocketmq.replicator.config.DataType; +import org.apache.rocketmq.replicator.config.TaskConfigEnum; +import org.apache.rocketmq.replicator.config.TaskDivideConfig; +import org.apache.rocketmq.replicator.config.TaskTopicInfo; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class DivideTaskByQueue extends TaskDivideStrategy { + @Override + public List<KeyValue> divide(Map<String, List<TaskTopicInfo>> topicRouteMap, TaskDivideConfig tdc) { + + List<KeyValue> config = new ArrayList<KeyValue>(); + int parallelism = tdc.getTaskParallelism(); + Map<Integer, List<TaskTopicInfo>> queueTopicList = new HashMap<Integer, List<TaskTopicInfo>>(); + int id = -1; + for (String t : topicRouteMap.keySet()) { + for (TaskTopicInfo taskTopicInfo : topicRouteMap.get(t)) { + int ind = ++id % parallelism; + if (!queueTopicList.containsKey(ind)) { + queueTopicList.put(ind, new ArrayList<TaskTopicInfo>()); + } + queueTopicList.get(ind).add(taskTopicInfo); + } + } + + for (int i = 0; i < parallelism; i++) { + KeyValue keyValue = new DefaultKeyValue(); + keyValue.put(TaskConfigEnum.TASK_STORE_ROCKETMQ.getKey(), tdc.getStoreTopic()); + keyValue.put(TaskConfigEnum.TASK_SOURCE_ROCKETMQ.getKey(), tdc.getSourceNamesrvAddr()); + keyValue.put(TaskConfigEnum.TASK_DATA_TYPE.getKey(), DataType.COMMON_MESSAGE.ordinal()); + keyValue.put(TaskConfigEnum.TASK_TOPIC_INFO.getKey(), JSONObject.toJSONString(queueTopicList.get(i))); + keyValue.put(TaskConfigEnum.TASK_SOURCE_RECORD_CONVERTER.getKey(), tdc.getSrcRecordConverter()); + config.add(keyValue); + } + + return config; + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java new file mode 100644 index 0000000..762c7a0 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.connect.jdbc.strategy; + +import com.alibaba.fastjson.JSONObject; +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.Task; +import io.openmessaging.internal.DefaultKeyValue; +import org.apache.rocketmq.connect.jdbc.config.*; +import org.apache.rocketmq.connect.jdbc.connector.JdbcSourceTask; + +import java.util.*; + +public class DivideTaskByTopic extends TaskDivideStrategy { + @Override + public List<KeyValue> divide(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc) { + if (dbConnectorConfig instanceof SourceDbConnectorConfig){ + return divideSourceTaskByTopic(dbConnectorConfig, tdc); + }else { + return divideSinkTaskByTopic(dbConnectorConfig, tdc); + } + + } + + private List<KeyValue> divideSourceTaskByTopic(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc) { + List<KeyValue> config = new ArrayList<KeyValue>(); + int parallelism = tdc.getTaskParallelism(); + int id = -1; + Map<String, String> topicRouteMap = ((SourceDbConnectorConfig)dbConnectorConfig).getWhiteTopics(); + Map<Integer, Map<String, Map<String, String>>> taskTopicList = new HashMap<>(); + for (Map.Entry<String, String> entry : topicRouteMap.entrySet()) { + int ind = ++id % parallelism; + if (!taskTopicList.containsKey(ind)) { + taskTopicList.put(ind, new HashMap<>()); + } + String dbKey = entry.getKey().split("-")[0]; + String tableKey = entry.getKey().split("-")[1]; + String filter = entry.getValue(); + Map<String, String> tableMap = new HashMap<>(); + tableMap.put(tableKey, filter); + taskTopicList.get(ind).put(dbKey, tableMap); + } + + for (int i = 0; i < parallelism; i++) { + KeyValue keyValue = new DefaultKeyValue(); + + keyValue.put(Config.CONN_DB_IP, tdc.getDbUrl()); + keyValue.put(Config.CONN_DB_PORT, tdc.getDbPort()); + keyValue.put(Config.CONN_DB_USERNAME, tdc.getDbUserName()); + keyValue.put(Config.CONN_DB_PASSWORD, tdc.getDbPassword()); + keyValue.put(Config.CONN_WHITE_LIST, JSONObject.toJSONString(taskTopicList.get(i))); + keyValue.put(Config.CONN_DATA_TYPE, tdc.getDataType()); + keyValue.put(Config.CONN_SOURCE_RECORD_CONVERTER, tdc.getSrcRecordConverter()); + keyValue.put(Config.CONN_DB_MODE, tdc.getMode()); + config.add(keyValue); + } + + return config; + } + + private List<KeyValue> divideSinkTaskByTopic(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc) { + List<KeyValue> config = new ArrayList<KeyValue>(); + int parallelism = tdc.getTaskParallelism(); + int id = -1; + Set<String> topicRouteSet = ((SinkDbConnectorConfig)dbConnectorConfig).getWhiteTopics(); + Map<Integer, String> taskTopicList = new HashMap<>(); + for (String topicName : topicRouteSet) { + int ind = ++id % parallelism; + if (!taskTopicList.containsKey(ind)) { + taskTopicList.put(ind, topicName); + } + } + + for (int i = 0; i < parallelism; i++) { + KeyValue keyValue = new DefaultKeyValue(); + keyValue.put(Config.CONN_DB_IP, tdc.getDbUrl()); + keyValue.put(Config.CONN_DB_PORT, tdc.getDbPort()); + keyValue.put(Config.CONN_DB_USERNAME, tdc.getDbUserName()); + keyValue.put(Config.CONN_DB_PASSWORD, tdc.getDbPassword()); + keyValue.put(Config.CONN_TOPIC_NAMES, JSONObject.toJSONString(taskTopicList.get(i))); + keyValue.put(Config.CONN_DATA_TYPE, tdc.getDataType()); + keyValue.put(Config.CONN_SOURCE_RECORD_CONVERTER, tdc.getSrcRecordConverter()); + keyValue.put(Config.CONN_DB_MODE, tdc.getMode()); + config.add(keyValue); + } + + return config; + } + +} diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/TaskDivideStrategy.java b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/TaskDivideStrategy.java new file mode 100644 index 0000000..736fcac --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/TaskDivideStrategy.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.connect.jdbc.strategy; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.Task; +import org.apache.rocketmq.connect.jdbc.config.DbConnectorConfig; +import org.apache.rocketmq.connect.jdbc.config.TaskDivideConfig; +import org.apache.rocketmq.connect.jdbc.config.TaskTopicInfo; + +import java.util.List; +import java.util.Map; + +public abstract class TaskDivideStrategy { + + public abstract List<KeyValue> divide(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc); + +}
