This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit bbb12023c30a11d1b6190d0e00fdb5c6422d6f05 Author: jonnxu <[email protected]> AuthorDate: Fri Jul 5 01:39:45 2019 +0800 rocketmq-connect-kafka --- pom.xml | 210 ++++++++++++++++++ .../org/apache/rocketmq/connect/kafka/Config.java | 116 ++++++++++ .../kafka/connector/KafkaSourceConnector.java | 103 +++++++++ .../connect/kafka/connector/KafkaSourceTask.java | 247 +++++++++++++++++++++ src/main/resources/connect-kafka-source.properties | 23 ++ .../kafka/connector/KafkaSourceConnectorTest.java | 56 +++++ .../kafka/connector/KafkaSourceTaskTest.java | 43 ++++ 7 files changed, 798 insertions(+) diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..08712ca --- /dev/null +++ b/pom.xml @@ -0,0 +1,210 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-connect-kafka</artifactId> + <version>0.0.1-SNAPSHOT</version> + <name>rocketmq-connect-kafka</name> + <packaging>pom</packaging> + + <licenses> + <license> + <name>The Apache Software License, Version 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + <distribution>repo</distribution> + </license> + </licenses> + + <scm> + <url>https://github.com/openmessaging/openmessaging-connector</url> + <connection>scm:git:[email protected]:openmessaging/openmessaging-connector.git</connection> + <developerConnection>scm:git:[email protected]:openmessaging/openmessaging-connector.git</developerConnection> + <tag>HEAD</tag> + </scm> + + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + + <!-- Compiler settings properties --> + <maven.compiler.source>1.8</maven.compiler.source> + <maven.compiler.target>1.8</maven.compiler.target> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>versions-maven-plugin</artifactId> + <version>2.3</version> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>clirr-maven-plugin</artifactId> + <version>2.7</version> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.6.1</version> + <configuration> + <source>${maven.compiler.source}</source> + <target>${maven.compiler.target}</target> + <compilerVersion>${maven.compiler.source}</compilerVersion> + <showDeprecation>true</showDeprecation> + <showWarnings>true</showWarnings> + </configuration> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.19.1</version> + <configuration> + <argLine>-Xms512m -Xmx1024m</argLine> + <forkMode>always</forkMode> + <includes> + <include>**/*Test.java</include> + </includes> + </configuration> + </plugin> + <plugin> + <artifactId>maven-site-plugin</artifactId> + <version>3.6</version> + <configuration> + <locales>en_US</locales> + <outputEncoding>UTF-8</outputEncoding> + <inputEncoding>UTF-8</inputEncoding> + </configuration> + </plugin> + <plugin> + <artifactId>maven-source-plugin</artifactId> + <version>3.0.1</version> + <executions> + <execution> + <id>attach-sources</id> + <goals> + <goal>jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-javadoc-plugin</artifactId> + <version>2.10.4</version> + <configuration> + <charset>UTF-8</charset> + <locale>en_US</locale> + <excludePackageNames>io.openmessaging.internal</excludePackageNames> + </configuration> + <executions> + <execution> + <id>aggregate</id> + <goals> + <goal>aggregate</goal> + </goals> + <phase>site</phase> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <version>3.0.2</version> + <configuration> + <encoding>${project.build.sourceEncoding}</encoding> + </configuration> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>findbugs-maven-plugin</artifactId> + <version>3.0.4</version> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>0.11.0.2</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + <scope>test</scope> + </dependency> + <!-- + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <version>2.6.0</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>2.6.3</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.5.5</version> + <scope>test</scope> + </dependency> + --> + <dependency> + <groupId>io.openmessaging</groupId> + <artifactId>openmessaging-connect-runtime</artifactId> + <version>0.0.1-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>io.openmessaging</groupId> + <artifactId>openmessaging-connector</artifactId> + <version>0.1.0-beta</version> + </dependency> + <dependency> + <groupId>io.openmessaging</groupId> + <artifactId>openmessaging-api</artifactId> + <version>1.0.0-alpha</version> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-openmessaging</artifactId> + <version>4.3.2</version> + </dependency> + <dependency> + <groupId>com.alibaba</groupId> + <artifactId>fastjson</artifactId> + <version>1.2.51</version> + </dependency> + <dependency> + <groupId>io.javalin</groupId> + <artifactId>javalin</artifactId> + <version>1.3.0</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.7</version> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + <version>1.0.13</version> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-core</artifactId> + <version>1.0.13</version> + </dependency> + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + <version>1.2</version> + </dependency> + </dependencies> +</project> \ No newline at end of file diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/Config.java b/src/main/java/org/apache/rocketmq/connect/kafka/Config.java new file mode 100644 index 0000000..869597e --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/kafka/Config.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.kafka; + +import io.openmessaging.KeyValue; +import java.lang.reflect.Method; +import java.util.*; + +public class Config { + + public static String TASK_NUM = "tasks.num"; + public static String TOPICS = "kafka.topics"; + public static String GROUP_ID = "kafka.group.id"; + public static String BOOTSTRAP_SERVER = "kafka.bootstrap.server"; + public static String ROCKETMQ_TOPIC = "rocketmq.topic"; + + private String bootstrapServers; + private String topics; + private String groupId; + + public String getTopics() { + return topics; + } + + public void setTopics(String topics) { + this.topics = topics; + } + + public String getBootstrapServers() { + return bootstrapServers; + } + + public void setBootstrapServers(String bootstrapServers) { + this.bootstrapServers = bootstrapServers; + } + + public String getGroupId() { + return groupId; + } + + public void setGroupId(String groupId) { + this.groupId = groupId; + } + + public static final Set<String> REQUEST_CONFIG = new HashSet<String>(){ + { + add(TOPICS); + add(GROUP_ID); + add(BOOTSTRAP_SERVER); + } + }; + + public void load(KeyValue props) { + properties2Object(props, this); + } + + private void properties2Object(final KeyValue p, final Object object) { + + Method[] methods = object.getClass().getMethods(); + for (Method method : methods) { + String mn = method.getName(); + if (mn.startsWith("set")) { + try { + String tmp = mn.substring(4); + String first = mn.substring(3, 4); + + String key = first.toLowerCase() + tmp; + String property = p.getString(key); + if (property != null) { + Class<?>[] pt = method.getParameterTypes(); + if (pt != null && pt.length > 0) { + String cn = pt[0].getSimpleName(); + Object arg; + if (cn.equals("int") || cn.equals("Integer")) { + arg = Integer.parseInt(property); + } else if (cn.equals("long") || cn.equals("Long")) { + arg = Long.parseLong(property); + } else if (cn.equals("double") || cn.equals("Double")) { + arg = Double.parseDouble(property); + } else if (cn.equals("boolean") || cn.equals("Boolean")) { + arg = Boolean.parseBoolean(property); + } else if (cn.equals("float") || cn.equals("Float")) { + arg = Float.parseFloat(property); + } else if (cn.equals("String")) { + arg = property; + } else { + continue; + } + method.invoke(object, arg); + } + } + } catch (Throwable ignored) { + } + } + } + } + + public static Set<String> getRequestConfig() { + return REQUEST_CONFIG; + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java new file mode 100644 index 0000000..ba30901 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.kafka.connector; + +import io.openmessaging.KeyValue; +import io.openmessaging.connect.runtime.common.ConnectKeyValue; +import io.openmessaging.connect.runtime.config.RuntimeConfigDefine; +import io.openmessaging.connector.api.Task; +import io.openmessaging.connector.api.source.SourceConnector; +import org.apache.rocketmq.connect.kafka.Config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +public class KafkaSourceConnector extends SourceConnector{ + private static final Logger log = LoggerFactory.getLogger(KafkaSourceConnector.class); + + private KeyValue connectConfig; + + public KafkaSourceConnector() { + super(); + } + + @Override + public String verifyAndSetConfig(KeyValue config) { + + log.info("KafkaSourceConnector verifyAndSetConfig enter"); + for ( String key : config.keySet()) { + log.info("connector verifyAndSetConfig: key:{}, value:{}", key, config.getString(key)); + } + + for(String requestKey : Config.REQUEST_CONFIG){ + if(!config.containsKey(requestKey)){ + return "Request Config key: " + requestKey; + } + } + this.connectConfig = config; + return ""; + } + + @Override + public void start() { + log.info("KafkaSourceConnector start enter"); + } + + @Override + public void stop() { + log.info("KafkaSourceConnector stop enter"); + } + + @Override + public void pause() { + + } + + @Override + public void resume() { + + } + + @Override + public Class<? extends Task> taskClass() { + return KafkaSourceTask.class; + } + + @Override + public List<KeyValue> taskConfigs() { + + log.info("Source Connector taskConfigs enter"); + List<KeyValue> configs = new ArrayList<>(); + int task_num = connectConfig.getInt(Config.TASK_NUM); + log.info("Source Connector taskConfigs: task_num:" + task_num); + for (int i=0; i < task_num; ++i) { + KeyValue config = new ConnectKeyValue(); + config.put(Config.BOOTSTRAP_SERVER, connectConfig.getString(Config.BOOTSTRAP_SERVER)); + config.put(Config.TOPICS, connectConfig.getString(Config.TOPICS)); + config.put(Config.GROUP_ID, connectConfig.getString(Config.GROUP_ID)); + + config.put(RuntimeConfigDefine.CONNECTOR_CLASS, connectConfig.getString(RuntimeConfigDefine.CONNECTOR_CLASS)); + config.put(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, connectConfig.getString(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER)); + config.put(RuntimeConfigDefine.OMS_DRIVER_URL, connectConfig.getString(RuntimeConfigDefine.OMS_DRIVER_URL)); + configs.add(config); + } + return configs; + } +} diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java new file mode 100644 index 0000000..d4b39e0 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.kafka.connector; + +import com.alibaba.fastjson.JSON; +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.data.*; +import io.openmessaging.connector.api.source.SourceTask; +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.TopicPartition; +import org.apache.rocketmq.connect.kafka.Config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.management.ManagementFactory; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.util.*; + +public class KafkaSourceTask extends SourceTask { + + private static final Logger log = LoggerFactory.getLogger(KafkaSourceTask.class); + private KafkaConsumer<ByteBuffer, ByteBuffer> consumer; + private KeyValue config; + private List<String> topicList; + private List<TopicPartition> currentTPList; + + @Override + public Collection<SourceDataEntry> poll() { + + try { + ArrayList<SourceDataEntry> entries = new ArrayList<>(); + ConsumerRecords<ByteBuffer, ByteBuffer> records = consumer.poll(1000); + if (records.count() > 0) { + log.info("consumer.poll, records.count {}", records.count()); + } + for (ConsumerRecord<ByteBuffer, ByteBuffer> record : records) { + String topic_partition = record.topic() + "-" + record.partition(); + log.info("Received {} record: {} ", topic_partition, record); + + Schema schema = new Schema(); + List<Field> fields = new ArrayList<>(); + fields.add(new Field(0, "key", FieldType.BYTES)); + fields.add(new Field(1, "value", FieldType.BYTES)); + schema.setName(record.topic()); + schema.setFields(fields); + schema.setDataSource(record.topic()); + + ByteBuffer sourcePartition = ByteBuffer.wrap(topic_partition.getBytes()); + ByteBuffer sourcePosition = ByteBuffer.allocate(8); + sourcePosition.asLongBuffer().put(record.offset()); + + DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema); + dataEntryBuilder.entryType(EntryType.CREATE); + dataEntryBuilder.queue(record.topic()); //queueName will be set to RocketMQ topic by runtime + dataEntryBuilder.timestamp(System.currentTimeMillis()); + if (record.key() != null) { + dataEntryBuilder.putFiled("key", JSON.toJSONString(record.key().array())); + } else { + dataEntryBuilder.putFiled("key", null); + } + dataEntryBuilder.putFiled("value", JSON.toJSONString(record.value().array())); + SourceDataEntry entry = dataEntryBuilder.buildSourceDataEntry(sourcePartition, sourcePosition); + entries.add(entry); + } + + log.info("poll return entries size {} ", entries.size()); + return entries; + } catch (Exception e) { + e.printStackTrace(); + log.error("poll exception {}", e); + } + return null; + } + + @Override + public void start(KeyValue taskConfig) { + log.info("source task start enter"); + this.topicList = new ArrayList<>(); + this.currentTPList = new ArrayList<>(); + this.config = taskConfig; + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.config.getString(Config.BOOTSTRAP_SERVER)); + props.put(ConsumerConfig.GROUP_ID_CONFIG, this.config.getString(Config.GROUP_ID)); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteBufferDeserializer"); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteBufferDeserializer"); + + this.consumer = new KafkaConsumer<>(props); + + String topics = this.config.getString(Config.TOPICS); + for (String topic : topics.split(",")) { + if (!topic.isEmpty()) { + topicList.add(topic); + } + } + + consumer.subscribe(topicList, new MyRebalanceListener()); + log.info("source task subscribe topicList {}", topicList); + } + + @Override + public void stop() { + log.info("source task stop enter"); + try { + commitOffset(currentTPList, true); + consumer.close(); + } catch (Exception e) { + log.warn("{} consumer {} close exception {}", this, consumer, e); + } + } + + @Override + public void pause() { + log.info("source task pause ..."); + } + + @Override + public void resume() { + log.info("source task resume ..."); + } + + public String toString() { + String name = ManagementFactory.getRuntimeMXBean().getName(); + String pid = name.split("@")[0]; + return "KafkaSourceTask-PID[" + pid + "]-" + Thread.currentThread().toString(); + } + + public static TopicPartition getTopicPartition(ByteBuffer buffer) + { + Charset charset = null; + CharsetDecoder decoder = null; + CharBuffer charBuffer = null; + try + { + charset = Charset.forName("UTF-8"); + decoder = charset.newDecoder(); + charBuffer = decoder.decode(buffer.asReadOnlyBuffer()); + String topic_partition = charBuffer.toString(); + int index = topic_partition.lastIndexOf('-'); + if (index != -1 && index > 1) { + String topic = topic_partition.substring(0, index - 1); + int partition = Integer.parseInt(topic_partition.substring(index + 1)); + return new TopicPartition(topic, partition); + } + } + catch (Exception ex) + { + ex.printStackTrace(); + log.warn("getString Exception {}", ex); + } + return null; + } + + private void commitOffset(Collection<TopicPartition> tpList, boolean isClose) { + + if(tpList == null || tpList.isEmpty()) + return; + + log.info("commitOffset {} topic partition {}", KafkaSourceTask.this, tpList); + List<ByteBuffer> topic_partition_list = new ArrayList<>(); + for (TopicPartition tp : tpList) { + topic_partition_list.add(ByteBuffer.wrap((tp.topic()+"-"+tp.partition()).getBytes())); + } + + Map<TopicPartition, OffsetAndMetadata> commitOffsets = new HashMap<>(); + Map<ByteBuffer, ByteBuffer> topic_position_map = context.positionStorageReader().getPositions(topic_partition_list); + for (Map.Entry<ByteBuffer, ByteBuffer> entry : topic_position_map.entrySet()) { + TopicPartition tp = getTopicPartition(entry.getKey()); + if (tp != null && tpList.contains(tp)) { + //positionStorage store more than this task's topic and partition + try { + long local_offset = entry.getValue().asLongBuffer().get(); + commitOffsets.put(tp, new OffsetAndMetadata(local_offset)); + } catch (Exception e) { + log.warn("commitOffset get local offset exception {}", e); + } + } + } + + commitOffsets.entrySet().stream().forEach((Map.Entry<TopicPartition, OffsetAndMetadata> entry) -> + log.info("commitOffset {}, TopicPartition:{} offset:{}", KafkaSourceTask.this, entry.getKey(), entry.getValue())); + if (!commitOffsets.isEmpty()) { + if (isClose) { + consumer.commitSync(commitOffsets); + } else { + consumer.commitAsync(commitOffsets, new MyOffsetCommitCallback()); + } + } + } + + private class MyOffsetCommitCallback implements OffsetCommitCallback { + + @Override + public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) { + if (e != null) { + log.warn("commit async excepiton {}", e); + map.entrySet().stream().forEach((Map.Entry<TopicPartition, OffsetAndMetadata> entry) -> { + log.warn("commit exception, TopicPartition:{} offset: {}", entry.getKey().toString(), entry.getValue().offset()); + }); + return; + } + } + } + + private class MyRebalanceListener implements ConsumerRebalanceListener { + + @Override + public void onPartitionsAssigned(Collection<TopicPartition> partitions) { + + currentTPList.clear(); + for (TopicPartition tp : partitions) { + currentTPList.add(tp); + } + currentTPList.stream().forEach((TopicPartition tp)-> log.info("onPartitionsAssigned TopicPartition {}", tp)); + } + + @Override + public void onPartitionsRevoked(Collection<TopicPartition> partitions) { + + log.info("onPartitionsRevoked {} Partitions revoked", KafkaSourceTask.this); + try { + commitOffset(partitions, false); + } catch (Exception e) { + log.warn("onPartitionsRevoked exception", e); + } + } + } +} diff --git a/src/main/resources/connect-kafka-source.properties b/src/main/resources/connect-kafka-source.properties new file mode 100644 index 0000000..f974cb9 --- /dev/null +++ b/src/main/resources/connect-kafka-source.properties @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name=rocketmq-connect-kafka +connector-class=org.apache.rocketmq.connect.kafka.connector.KafkaSourceConnector +oms-driver-url=oms:rocketmq://101.132.96.164:9876/default:default +source-record-converter=io.openmessaging.connect.runtime.converter.JsonConverter +task.num=2 +kafka.bootstrap.server=47.112.213.204:9092;47.112.213.204:9092 +kafka.topic=jonnxu +kafka.group.id=connect-kafka-source-consumer-group diff --git a/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java b/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java new file mode 100644 index 0000000..c64b5e7 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.kafka.connector; + +import io.openmessaging.KeyValue; +import io.openmessaging.internal.DefaultKeyValue; +import org.apache.rocketmq.connect.kafka.Config; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class KafkaSourceConnectorTest { + KafkaSourceConnector connector = new KafkaSourceConnector(); + + @Test + public void verifyAndSetConfigTest() { + KeyValue keyValue = new DefaultKeyValue(); + + for (String requestKey : Config.REQUEST_CONFIG) { + assertEquals(connector.verifyAndSetConfig(keyValue), "Request config key: " + requestKey); + keyValue.put(requestKey, requestKey); + } + assertEquals(connector.verifyAndSetConfig(keyValue), ""); + } + + @Test + public void taskClassTest() { + assertEquals(connector.taskClass(), KafkaSourceConnector.class); + } + + @Test + public void taskConfigsTest() { + assertEquals(connector.taskConfigs().get(0), null); + KeyValue keyValue = new DefaultKeyValue(); + for (String requestKey : Config.REQUEST_CONFIG) { + keyValue.put(requestKey, requestKey); + } + connector.verifyAndSetConfig(keyValue); + assertEquals(connector.taskConfigs().get(0), keyValue); + } +} diff --git a/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTaskTest.java b/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTaskTest.java new file mode 100644 index 0000000..57239f6 --- /dev/null +++ b/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTaskTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.kafka.connector; + +import io.openmessaging.connector.api.data.SourceDataEntry; +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.Collection; +import java.util.concurrent.LinkedBlockingQueue; + +public class KafkaSourceTaskTest { + + @Test + public void pollTest() throws Exception { + KafkaSourceTask task = new KafkaSourceTask(); + Field config = KafkaSourceTask.class.getDeclaredField("config"); + config.setAccessible(true); + + Collection<SourceDataEntry> list = task.poll(); + Assert.assertEquals(list.size(), 1); + + list = task.poll(); + Assert.assertEquals(list.size(), 0); + + } +}
