This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit b9802aef6a6aced1243bda42bdf6a24764327bb9 Author: Heng Du <[email protected]> AuthorDate: Mon Mar 30 12:56:18 2020 +0800 fix(jdbc-connect) removed unused class (#544) --- .../rocketmq/connect/jdbc/config/Config.java | 31 ++++++++ .../jdbc/strategy/DivideTaskByConsistentHash.java | 82 ---------------------- .../jdbc/connector/JdbcSourceConnectorTest.java | 4 +- 3 files changed, 32 insertions(+), 85 deletions(-) diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java index ae86d3f..7b4aca3 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java @@ -78,6 +78,9 @@ public class Config { private long timestampDelayInterval = 0; private String dbTimezone = "GMT+8"; private String queueName; + private String jdbcUrl; + private String jdbcUsername; + private String jdbcPassword; private Logger log = LoggerFactory.getLogger(Config.class); public static final Set<String> REQUEST_CONFIG = new HashSet<String>() { @@ -322,4 +325,32 @@ public class Config { public void setWhiteTable(String whiteTable) { this.whiteTable = whiteTable; } + + public void setPollInterval(long pollInterval) { + this.pollInterval = pollInterval; + } + + public String getJdbcUrl() { + return jdbcUrl; + } + + public void setJdbcUrl(String jdbcUrl) { + this.jdbcUrl = jdbcUrl; + } + + public String getJdbcUsername() { + return jdbcUsername; + } + + public void setJdbcUsername(String jdbcUsername) { + this.jdbcUsername = jdbcUsername; + } + + public String getJdbcPassword() { + return jdbcPassword; + } + + public void setJdbcPassword(String jdbcPassword) { + this.jdbcPassword = jdbcPassword; + } } \ No newline at end of file diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByConsistentHash.java b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByConsistentHash.java deleted file mode 100644 index bac7358..0000000 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByConsistentHash.java +++ /dev/null @@ -1,82 +0,0 @@ -package org.apache.rocketmq.connect.jdbc.strategy;/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import com.alibaba.fastjson.JSONObject; -import io.openmessaging.KeyValue; -import io.openmessaging.internal.DefaultKeyValue; -import org.apache.rocketmq.common.consistenthash.ConsistentHashRouter; -import org.apache.rocketmq.common.consistenthash.Node; -import org.apache.rocketmq.replicator.config.DataType; -import org.apache.rocketmq.replicator.config.TaskConfigEnum; -import org.apache.rocketmq.replicator.config.TaskDivideConfig; -import org.apache.rocketmq.replicator.config.TaskTopicInfo; - -import java.util.*; - -public class DivideTaskByConsistentHash extends TaskDivideStrategy { - @Override public List<KeyValue> divide(Map<String, List<TaskTopicInfo>> topicMap, TaskDivideConfig tdc) { - - List<KeyValue> config = new ArrayList<>(); - int parallelism = tdc.getTaskParallelism(); - Map<Integer, List<TaskTopicInfo>> queueTopicList = new HashMap<>(); - int id = -1; - - Collection<ClientNode> cidNodes = new ArrayList<>(); - for (int i = 0; i < parallelism; i++) { - cidNodes.add(new ClientNode(i, Integer.toString(i))); - queueTopicList.put(i, new ArrayList<>()); - } - - ConsistentHashRouter<ClientNode> router = new ConsistentHashRouter<>(cidNodes, cidNodes.size()); - - for (String t : topicMap.keySet()) { - for (TaskTopicInfo queue : topicMap.get(t)) { - ClientNode clientNode = router.routeNode(queue.toString()); - if (clientNode != null) { - queueTopicList.get(clientNode.index).add(queue); - } - } - } - - for (int i = 0; i < parallelism; i++) { - KeyValue keyValue = new DefaultKeyValue(); - keyValue.put(TaskConfigEnum.TASK_STORE_ROCKETMQ.getKey(), tdc.getStoreTopic()); - keyValue.put(TaskConfigEnum.TASK_SOURCE_ROCKETMQ.getKey(), tdc.getSourceNamesrvAddr()); - keyValue.put(TaskConfigEnum.TASK_DATA_TYPE.getKey(), DataType.COMMON_MESSAGE.ordinal()); - keyValue.put(TaskConfigEnum.TASK_TOPIC_INFO.getKey(), JSONObject.toJSONString(queueTopicList.get(i))); - keyValue.put(TaskConfigEnum.TASK_SOURCE_RECORD_CONVERTER.getKey(), tdc.getSrcRecordConverter()); - config.add(keyValue); - } - - return config; - } - - private static class ClientNode implements Node { - private final String clientID; - private final int index; - - public ClientNode(int index, String clientID) { - this.index = index; - this.clientID = clientID; - } - - @Override - public String getKey() { - return clientID; - } - } -} diff --git a/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java b/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java index 1e7cf78..5d25f98 100644 --- a/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java +++ b/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java @@ -22,9 +22,7 @@ import static org.junit.Assert.assertEquals; import java.util.HashSet; import java.util.Set; -import org.apache.rocketmq.connect.jdbc.Config; -import org.apache.rocketmq.connect.jdbc.connector.JdbcSourceTask; - +import org.apache.rocketmq.connect.jdbc.config.Config; import org.junit.Test; import io.openmessaging.KeyValue;
