This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit e74c022c593a99b5c79d8e8a774254e7ef44f01a Author: sanchen <[email protected]> AuthorDate: Sun Oct 6 16:42:42 2019 +0800 [ISSUE #420]remove openmessage-runtime dependency --- README.md | 2 +- pom.xml | 5 -- .../connect/kafka/config/ConfigDefine.java | 67 ++++++++++++++++++++++ .../kafka/{Config.java => config/ConfigUtil.java} | 58 ++----------------- .../kafka/connector/KafkaSourceConnector.java | 35 +++++------ .../connect/kafka/connector/KafkaSourceTask.java | 8 +-- src/main/resources/connect-kafka-source.properties | 1 - .../kafka/connector/KafkaSourceConnectorTest.java | 15 ++--- 8 files changed, 104 insertions(+), 87 deletions(-) diff --git a/README.md b/README.md index d0917cb..213c8fa 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ **启动Connector** -http://127.0.0.1:8081/connectors/connector-name?config={"connector-class":"org.apache.rocketmq.connect.kafka.connector.KafkaSourceConnector","oms-driver-url":"oms: rocketmq://127.0.0.1:9876/default:default","tasks.num":"1","kafka.topics":"test1,test2","kafka.group.id":"group0","kafka.bootstrap.server":"127.0.0.1:9092","source-record-converter":"io.openmessaging.connect.runtime.converter.JsonConverter"} +http://127.0.0.1:8081/connectors/connector-name?config={"connector-class":"org.apache.rocketmq.connect.kafka.connector.KafkaSourceConnector","tasks.num":"1","kafka.topics":"test1,test2","kafka.group.id":"group0","kafka.bootstrap.server":"127.0.0.1:9092","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"} **查看Connector运行状态** diff --git a/pom.xml b/pom.xml index 08712ca..ccc4cc1 100644 --- a/pom.xml +++ b/pom.xml @@ -158,11 +158,6 @@ --> <dependency> <groupId>io.openmessaging</groupId> - <artifactId>openmessaging-connect-runtime</artifactId> - <version>0.0.1-SNAPSHOT</version> - </dependency> - <dependency> - <groupId>io.openmessaging</groupId> <artifactId>openmessaging-connector</artifactId> <version>0.1.0-beta</version> </dependency> diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java b/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java new file mode 100644 index 0000000..9a7f1ba --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.kafka.config; + +import java.util.*; + +public class ConfigDefine { + + public static String TASK_NUM = "tasks.num"; + public static String TOPICS = "kafka.topics"; + public static String GROUP_ID = "kafka.group.id"; + public static String BOOTSTRAP_SERVER = "kafka.bootstrap.server"; + public static String CONNECTOR_CLASS = "connector-class"; + public static String SOURCE_RECORD_CONVERTER = "source-record-converter"; + public static String ROCKETMQ_TOPIC = "rocketmq.topic"; + + private String bootstrapServers; + private String topics; + private String groupId; + + public String getTopics() { + return topics; + } + + public void setTopics(String topics) { + this.topics = topics; + } + + public String getBootstrapServers() { + return bootstrapServers; + } + + public void setBootstrapServers(String bootstrapServers) { + this.bootstrapServers = bootstrapServers; + } + + public String getGroupId() { + return groupId; + } + + public void setGroupId(String groupId) { + this.groupId = groupId; + } + + public static final Set<String> REQUEST_CONFIG = new HashSet<String>(){ + { + add(TOPICS); + add(GROUP_ID); + add(BOOTSTRAP_SERVER); + } + }; +} diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/Config.java b/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigUtil.java similarity index 66% rename from src/main/java/org/apache/rocketmq/connect/kafka/Config.java rename to src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigUtil.java index 869597e..0587dae 100644 --- a/src/main/java/org/apache/rocketmq/connect/kafka/Config.java +++ b/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigUtil.java @@ -14,62 +14,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -package org.apache.rocketmq.connect.kafka; +package org.apache.rocketmq.connect.kafka.config; import io.openmessaging.KeyValue; -import java.lang.reflect.Method; -import java.util.*; - -public class Config { - - public static String TASK_NUM = "tasks.num"; - public static String TOPICS = "kafka.topics"; - public static String GROUP_ID = "kafka.group.id"; - public static String BOOTSTRAP_SERVER = "kafka.bootstrap.server"; - public static String ROCKETMQ_TOPIC = "rocketmq.topic"; - - private String bootstrapServers; - private String topics; - private String groupId; - public String getTopics() { - return topics; - } - - public void setTopics(String topics) { - this.topics = topics; - } - - public String getBootstrapServers() { - return bootstrapServers; - } +import java.lang.reflect.Method; - public void setBootstrapServers(String bootstrapServers) { - this.bootstrapServers = bootstrapServers; - } +public class ConfigUtil { - public String getGroupId() { - return groupId; - } + public static <T> void load(KeyValue props, Object object) { - public void setGroupId(String groupId) { - this.groupId = groupId; + properties2Object(props, object); } - public static final Set<String> REQUEST_CONFIG = new HashSet<String>(){ - { - add(TOPICS); - add(GROUP_ID); - add(BOOTSTRAP_SERVER); - } - }; - - public void load(KeyValue props) { - properties2Object(props, this); - } - - private void properties2Object(final KeyValue p, final Object object) { + private static <T> void properties2Object(final KeyValue p, final Object object) { Method[] methods = object.getClass().getMethods(); for (Method method : methods) { @@ -109,8 +67,4 @@ public class Config { } } } - - public static Set<String> getRequestConfig() { - return REQUEST_CONFIG; - } } diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java index ba30901..567a8e9 100644 --- a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java +++ b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java @@ -18,18 +18,17 @@ package org.apache.rocketmq.connect.kafka.connector; import io.openmessaging.KeyValue; -import io.openmessaging.connect.runtime.common.ConnectKeyValue; -import io.openmessaging.connect.runtime.config.RuntimeConfigDefine; import io.openmessaging.connector.api.Task; import io.openmessaging.connector.api.source.SourceConnector; -import org.apache.rocketmq.connect.kafka.Config; +import io.openmessaging.internal.DefaultKeyValue; +import org.apache.rocketmq.connect.kafka.config.ConfigDefine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; -public class KafkaSourceConnector extends SourceConnector{ +public class KafkaSourceConnector extends SourceConnector { private static final Logger log = LoggerFactory.getLogger(KafkaSourceConnector.class); private KeyValue connectConfig; @@ -42,12 +41,12 @@ public class KafkaSourceConnector extends SourceConnector{ public String verifyAndSetConfig(KeyValue config) { log.info("KafkaSourceConnector verifyAndSetConfig enter"); - for ( String key : config.keySet()) { + for (String key : config.keySet()) { log.info("connector verifyAndSetConfig: key:{}, value:{}", key, config.getString(key)); } - for(String requestKey : Config.REQUEST_CONFIG){ - if(!config.containsKey(requestKey)){ + for (String requestKey : ConfigDefine.REQUEST_CONFIG) { + if (!config.containsKey(requestKey)) { return "Request Config key: " + requestKey; } } @@ -82,20 +81,22 @@ public class KafkaSourceConnector extends SourceConnector{ @Override public List<KeyValue> taskConfigs() { + if (connectConfig == null) { + return new ArrayList<KeyValue>(); + } log.info("Source Connector taskConfigs enter"); List<KeyValue> configs = new ArrayList<>(); - int task_num = connectConfig.getInt(Config.TASK_NUM); + int task_num = connectConfig.getInt(ConfigDefine.TASK_NUM); log.info("Source Connector taskConfigs: task_num:" + task_num); - for (int i=0; i < task_num; ++i) { - KeyValue config = new ConnectKeyValue(); - config.put(Config.BOOTSTRAP_SERVER, connectConfig.getString(Config.BOOTSTRAP_SERVER)); - config.put(Config.TOPICS, connectConfig.getString(Config.TOPICS)); - config.put(Config.GROUP_ID, connectConfig.getString(Config.GROUP_ID)); - - config.put(RuntimeConfigDefine.CONNECTOR_CLASS, connectConfig.getString(RuntimeConfigDefine.CONNECTOR_CLASS)); - config.put(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, connectConfig.getString(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER)); - config.put(RuntimeConfigDefine.OMS_DRIVER_URL, connectConfig.getString(RuntimeConfigDefine.OMS_DRIVER_URL)); + for (int i = 0; i < task_num; ++i) { + KeyValue config = new DefaultKeyValue(); + config.put(ConfigDefine.BOOTSTRAP_SERVER, connectConfig.getString(ConfigDefine.BOOTSTRAP_SERVER)); + config.put(ConfigDefine.TOPICS, connectConfig.getString(ConfigDefine.TOPICS)); + config.put(ConfigDefine.GROUP_ID, connectConfig.getString(ConfigDefine.GROUP_ID)); + + config.put(ConfigDefine.CONNECTOR_CLASS, connectConfig.getString(ConfigDefine.CONNECTOR_CLASS)); + config.put(ConfigDefine.SOURCE_RECORD_CONVERTER, connectConfig.getString(ConfigDefine.SOURCE_RECORD_CONVERTER)); configs.add(config); } return configs; diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java index 1f7ed00..6122b0e 100644 --- a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java +++ b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java @@ -23,7 +23,7 @@ import io.openmessaging.connector.api.data.*; import io.openmessaging.connector.api.source.SourceTask; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; -import org.apache.rocketmq.connect.kafka.Config; +import org.apache.rocketmq.connect.kafka.config.ConfigDefine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,8 +97,8 @@ public class KafkaSourceTask extends SourceTask { this.currentTPList = new ArrayList<>(); this.config = taskConfig; Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.config.getString(Config.BOOTSTRAP_SERVER)); - props.put(ConsumerConfig.GROUP_ID_CONFIG, this.config.getString(Config.GROUP_ID)); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.config.getString(ConfigDefine.BOOTSTRAP_SERVER)); + props.put(ConsumerConfig.GROUP_ID_CONFIG, this.config.getString(ConfigDefine.GROUP_ID)); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteBufferDeserializer"); @@ -106,7 +106,7 @@ public class KafkaSourceTask extends SourceTask { this.consumer = new KafkaConsumer<>(props); - String topics = this.config.getString(Config.TOPICS); + String topics = this.config.getString(ConfigDefine.TOPICS); for (String topic : topics.split(",")) { if (!topic.isEmpty()) { topicList.add(topic); diff --git a/src/main/resources/connect-kafka-source.properties b/src/main/resources/connect-kafka-source.properties index f974cb9..5ab36ed 100644 --- a/src/main/resources/connect-kafka-source.properties +++ b/src/main/resources/connect-kafka-source.properties @@ -15,7 +15,6 @@ name=rocketmq-connect-kafka connector-class=org.apache.rocketmq.connect.kafka.connector.KafkaSourceConnector -oms-driver-url=oms:rocketmq://101.132.96.164:9876/default:default source-record-converter=io.openmessaging.connect.runtime.converter.JsonConverter task.num=2 kafka.bootstrap.server=47.112.213.204:9092;47.112.213.204:9092 diff --git a/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java b/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java index c64b5e7..1e12ca3 100644 --- a/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java +++ b/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java @@ -19,7 +19,7 @@ package org.apache.rocketmq.connect.kafka.connector; import io.openmessaging.KeyValue; import io.openmessaging.internal.DefaultKeyValue; -import org.apache.rocketmq.connect.kafka.Config; +import org.apache.rocketmq.connect.kafka.config.ConfigDefine; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -31,8 +31,8 @@ public class KafkaSourceConnectorTest { public void verifyAndSetConfigTest() { KeyValue keyValue = new DefaultKeyValue(); - for (String requestKey : Config.REQUEST_CONFIG) { - assertEquals(connector.verifyAndSetConfig(keyValue), "Request config key: " + requestKey); + for (String requestKey : ConfigDefine.REQUEST_CONFIG) { + assertEquals(connector.verifyAndSetConfig(keyValue), "Request Config key: " + requestKey); keyValue.put(requestKey, requestKey); } assertEquals(connector.verifyAndSetConfig(keyValue), ""); @@ -40,17 +40,18 @@ public class KafkaSourceConnectorTest { @Test public void taskClassTest() { - assertEquals(connector.taskClass(), KafkaSourceConnector.class); + assertEquals(connector.taskClass(), KafkaSourceTask.class); } @Test public void taskConfigsTest() { - assertEquals(connector.taskConfigs().get(0), null); + assertEquals(connector.taskConfigs().size(), 0); KeyValue keyValue = new DefaultKeyValue(); - for (String requestKey : Config.REQUEST_CONFIG) { + for (String requestKey : ConfigDefine.REQUEST_CONFIG) { keyValue.put(requestKey, requestKey); } + keyValue.put(ConfigDefine.TASK_NUM,1); connector.verifyAndSetConfig(keyValue); - assertEquals(connector.taskConfigs().get(0), keyValue); + assertEquals(connector.taskConfigs().get(0).getString(ConfigDefine.TOPICS), keyValue.getString(ConfigDefine.TOPICS)); } }
