This is an automated email from the ASF dual-hosted git repository.
sunxiaojian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 5ddf386 rocketmq-connect-kafka-connector-adapter 0.0.1-SNAPSHOT (#247)
5ddf386 is described below
commit 5ddf3862f55c720114b51a410c911faf14c8dcb6
Author: 欧夺标 <[email protected]>
AuthorDate: Fri Aug 19 14:49:46 2022 +0800
rocketmq-connect-kafka-connector-adapter 0.0.1-SNAPSHOT (#247)
Co-authored-by: ouduobiao <[email protected]>
---
.../README.md | 91 ++++++++++
.../pom.xml | 192 ++++++++++++++++++++
.../connect/kafka/config/ConfigDefine.java | 28 +++
.../kafka/connector/KafkaRocketmqConnector.java | 128 +++++++++++++
.../connector/KafkaRocketmqSinkConnector.java | 43 +++++
.../kafka/connector/KafkaRocketmqSinkTask.java | 199 +++++++++++++++++++++
.../connector/KafkaRocketmqSourceConnector.java | 43 +++++
.../kafka/connector/KafkaRocketmqSourceTask.java | 185 +++++++++++++++++++
.../connector/RocketmqKafkaConnectorContext.java | 21 +++
.../connector/RocketmqKafkaSinkTaskContext.java | 128 +++++++++++++
.../connector/RocketmqKafkaSourceTaskContext.java | 64 +++++++
.../rocketmq/connect/kafka/util/ConfigUtil.java | 34 ++++
.../connect/kafka/util/KafkaPluginsUtil.java | 25 +++
.../rocketmq/connect/kafka/util/RecordUtil.java | 157 ++++++++++++++++
14 files changed, 1338 insertions(+)
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/README.md
b/connectors/rocketmq-connect-kafka-connector-adapter/README.md
new file mode 100644
index 0000000..01b7a63
--- /dev/null
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/README.md
@@ -0,0 +1,91 @@
+**rocketmq-connect-kafka-connector-adapter**
+
+本项目的目标是让kafka connector运行在rocketmq-connect,使得数据在rocketmq导入导出。
+
+**参数说明**
+
+参数分为3类:rocketmq connect runtime参数、 kafka-connector-adapter参数,以及 具体kafka
connector参数
+
+rocketmq connect runtime参数:
+- **connector-class**: kafka-connector-adapter的类名
+
+
如果是SourceConnector,对应为org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSourceConnector。
+
+
如果是SinkConnector,对应为org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSinkConnector。
+
+- **connect-topicname**: 要导入导出数据的rocketmq topic
+- **tasks.num**: 启动的task数目
+
+kafka-connector-adapter参数:
+- **connector.class**: kafka connector的类名
+- **plugin.path**: kafka connector插件路径
+
+具体kafka connector参数:
+
+参考具体kafka connector的文档
+
+
+# 快速开始
+
+demo展示如何启动kafka-file-connector
+
+适配的kafka-file-connector的主要作用是从源文件中读取数据发送到RocketMQ集群 然后从Topic中读取消息,写入到目标文件
+
+## 1.获取kafka-file-connector
+
+1. 下载kafka的二进制包:https://kafka.apache.org/downloads
+2. 解压后到libs目录找到kafka-file-connector的jar包:connect-file-{version}.jar
+3. 将jar拷贝到专门目录,这个目录作为kafka connector插件路径:plugin.path,比如:/tmp/kafka-plugins
+
+
+## 2.构建rocketmq-connect-kafka-connector-adapter
+
+```
+git clone https://github.com/apache/rocketmq-connect.git
+
+cd connectors/rocketmq-connect-kafka-connector-adapter/
+
+mvn package
+
+```
+最后将/target/rocketmq-connect-kafka-connector-adapter-0.0.1-SNAPSHOT-jar-with-dependencies.jar拷贝到rocketmq插件目录下,并修改connect-standalone.conf的pluginPaths为对应的rocketmq插件目录
+,比如/tmp/rocketmq-plugins
+
+## 3.运行Worker
+
+```
+cd
distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
+
+sh bin/connect-standalone.sh -c conf/connect-standalone.conf &
+
+```
+
+## 4.启动source connector
+
+```
+touch /tmp/test-source-file.txt
+
+echo "Hello \r\nRocketMQ\r\n Connect" >> /tmp/test-source-file.txt
+
+curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connectors/fileSourceConnector -d
'{"connector-class":"org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSourceConnector","connect-topicname":"fileTopic","connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","plugin.path":"/tmp/kafka-plugins","topic":"fileTopic","file":"/tmp/test-source-file.txt"}'
+```
+
+## 5.启动sink connector
+
+```
+curl -X POST -H "Content-Type: application/json"
http://127.0.0.1:8082/connectors/fileSinkConnector -d
'{"connector-class":"org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSinkConnector","connect-topicname":"fileTopic","connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector","plugin.path":"/tmp/kafka-plugins","file":"/tmp/test-sink-file.txt"}'
+
+cat /tmp/test-sink-file.txt
+```
+
+# kafka connect transform
+
+todo
+
+# 如何运行kafka-mongo-connector
+
+todo
+
+# 如何运行kafka-jdbc-connector
+
+todo
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/pom.xml
b/connectors/rocketmq-connect-kafka-connector-adapter/pom.xml
new file mode 100644
index 0000000..5f62691
--- /dev/null
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/pom.xml
@@ -0,0 +1,192 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-connect-kafka-connector-adapter</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+ <!-- Compiler settings properties -->
+ <maven.compiler.source>1.8</maven.compiler.source>
+ <maven.compiler.target>1.8</maven.compiler.target>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>versions-maven-plugin</artifactId>
+ <version>2.3</version>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>clirr-maven-plugin</artifactId>
+ <version>2.7</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <configuration>
+
<outputDirectory>${project.build.directory}/lib</outputDirectory>
+ <excludeTransitive>false</excludeTransitive>
+ <stripVersion>true</stripVersion>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.6.1</version>
+ <configuration>
+ <source>${maven.compiler.source}</source>
+ <target>${maven.compiler.target}</target>
+ <compilerVersion>${maven.compiler.source}</compilerVersion>
+ <showDeprecation>true</showDeprecation>
+ <showWarnings>true</showWarnings>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.19.1</version>
+ <configuration>
+ <argLine>-Xms512m -Xmx1024m</argLine>
+ <forkMode>always</forkMode>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.6</version>
+ <configuration>
+ <locales>en_US</locales>
+ <outputEncoding>UTF-8</outputEncoding>
+ <inputEncoding>UTF-8</inputEncoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>3.0.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <version>0.12</version>
+ <configuration>
+ <excludes>
+ <exclude>README.md</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.10.4</version>
+ <configuration>
+ <charset>UTF-8</charset>
+ <locale>en_US</locale>
+
<excludePackageNames>io.openmessaging.internal</excludePackageNames>
+ </configuration>
+ <executions>
+ <execution>
+ <id>aggregate</id>
+ <goals>
+ <goal>aggregate</goal>
+ </goals>
+ <phase>site</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>3.0.2</version>
+ <configuration>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>3.0.4</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.0.0</version>
+ <configuration>
+ <archive>
+ <!-- The Main Class Here doesn't make a lot sense
since it was dynamically loaded-->
+ <manifest>
+
<mainClass>org.apache.rocketmq.connect.hudi.connector.HudiSinkConnector</mainClass>
+ </manifest>
+ </archive>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>2.17</version>
+ <executions>
+ <execution>
+ <id>verify</id>
+ <phase>verify</phase>
+ <configuration>
+
<configLocation>style/rmq_checkstyle.xml</configLocation>
+ <encoding>UTF-8</encoding>
+ <consoleOutput>true</consoleOutput>
+ <failsOnError>true</failsOnError>
+
<includeTestSourceDirectory>false</includeTestSourceDirectory>
+ <includeTestResources>false</includeTestResources>
+ </configuration>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>connect-runtime</artifactId>
+ <version>3.2.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-connector</artifactId>
+ <version>0.1.4</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.7</version>
+ </dependency>
+
+ </dependencies>
+</project>
diff --git
a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java
new file mode 100644
index 0000000..9cbba08
--- /dev/null
+++
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java
@@ -0,0 +1,28 @@
+package org.apache.rocketmq.connect.kafka.config;
+
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class ConfigDefine {
+ public static String ROCKETMQ_CONNECTOR_CLASS = "connector-class";
+ public static String CONNECTOR_CLASS =
ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+ public static String PLUGIN_PATH = "plugin.path";
+
+ public static final String TASK_CLASS = TaskConfig.TASK_CLASS_CONFIG;
+
+ public static final String KEY_CONVERTER =
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG;
+ public static final String VALUE_CONVERTER =
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG;
+ public static final String HEADER_CONVERTER =
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG;
+
+
+ public static final Set<String> REQUEST_CONFIG = new HashSet<String>(){
+ {
+ add(CONNECTOR_CLASS);
+ add(PLUGIN_PATH);
+ }
+ };
+}
diff --git
a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqConnector.java
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqConnector.java
new file mode 100644
index 0000000..89bee2c
--- /dev/null
+++
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqConnector.java
@@ -0,0 +1,128 @@
+package org.apache.rocketmq.connect.kafka.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.connector.Connector;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.rocketmq.connect.kafka.config.ConfigDefine;
+import org.apache.rocketmq.connect.kafka.util.ConfigUtil;
+import org.apache.rocketmq.connect.kafka.util.KafkaPluginsUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.DriverManager;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class KafkaRocketmqConnector extends Connector {
+ private static final Logger log =
LoggerFactory.getLogger(KafkaRocketmqConnector.class);
+
+ private Connector childConnector;
+
+ private org.apache.kafka.connect.connector.Connector kafkaConnector;
+ private Plugins kafkaPlugins;
+ private Map<String, String> kafkaConnectorConfigs;
+
+ public KafkaRocketmqConnector(Connector childConnector) {
+ this.childConnector = childConnector;
+ }
+
+ @Override
+ public List<KeyValue> taskConfigs(int maxTasks) {
+ List<KeyValue> taskKeyValueConfigs = new ArrayList<>();
+ runWithWithConnectorLoader(() ->{
+ List<Map<String, String>> taskConfigs =
this.kafkaConnector.taskConfigs(maxTasks);
+ taskKeyValueConfigs.addAll(
+ taskConfigs
+ .stream()
+ .map(ConfigUtil::mapConfigToKeyValue)
+ .collect(Collectors.toList())
+ );
+
+ taskKeyValueConfigs.forEach(kv -> {
+ kv.put(ConfigDefine.PLUGIN_PATH,
this.kafkaConnectorConfigs.get(ConfigDefine.PLUGIN_PATH));
+ kv.put(ConfigDefine.CONNECTOR_CLASS,
this.kafkaConnectorConfigs.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
+ kv.put(ConfigDefine.TASK_CLASS,
this.kafkaConnector.taskClass().getName());
+
+ kv.put(ConfigDefine.ROCKETMQ_CONNECTOR_CLASS,
childConnector.getClass().getName());
+
+ if(
this.kafkaConnectorConfigs.containsKey(ConfigDefine.KEY_CONVERTER)){
+ kv.put(ConfigDefine.KEY_CONVERTER,
this.kafkaConnectorConfigs.get(ConfigDefine.KEY_CONVERTER));
+ }
+
+ if(
this.kafkaConnectorConfigs.containsKey(ConfigDefine.VALUE_CONVERTER)){
+ kv.put(ConfigDefine.VALUE_CONVERTER,
this.kafkaConnectorConfigs.get(ConfigDefine.VALUE_CONVERTER));
+ }
+
+ if(
this.kafkaConnectorConfigs.containsKey(ConfigDefine.HEADER_CONVERTER)){
+ kv.put(ConfigDefine.HEADER_CONVERTER,
this.kafkaConnectorConfigs.get(ConfigDefine.HEADER_CONVERTER));
+ }
+ });
+
+ });
+ return taskKeyValueConfigs;
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return this.childConnector instanceof SourceConnector
+ ? KafkaRocketmqSourceTask.class : KafkaRocketmqSinkTask.class;
+ }
+
+ @Override
+ public void start(KeyValue config) {
+ runWithWithConnectorLoader(() ->{
+ this.kafkaConnector.start(this.kafkaConnectorConfigs);
+ });
+ }
+
+ @Override
+ public void stop() {
+ runWithWithConnectorLoader(() ->{
+ this.kafkaConnector.stop();
+ });
+ }
+
+
+ @Override
+ public void validate(KeyValue config) {
+
+ for(String requestConfig: ConfigDefine.REQUEST_CONFIG){
+ if(!config.containsKey(requestConfig)){
+ throw new ConnectException("miss config:"+requestConfig);
+ }
+ }
+
+ this.kafkaConnectorConfigs = ConfigUtil.keyValueConfigToMap(config);
+ log.info("kafka connector config is {}", this.kafkaConnectorConfigs);
+ this.kafkaPlugins =
KafkaPluginsUtil.getPlugins(Collections.singletonMap(KafkaPluginsUtil.PLUGIN_PATH,
this.kafkaConnectorConfigs.get(ConfigDefine.PLUGIN_PATH)));
+ String connectorClassName =
this.kafkaConnectorConfigs.get(ConfigDefine.CONNECTOR_CLASS);
+ ClassLoader connectorLoader =
this.kafkaPlugins.delegatingLoader().connectorLoader(connectorClassName);
+ ClassLoader savedLoader =
Plugins.compareAndSwapLoaders(connectorLoader);
+ try {
+ this.kafkaConnector =
this.kafkaPlugins.newConnector(connectorClassName);
+ this.kafkaConnector.validate(this.kafkaConnectorConfigs);
+ this.kafkaConnector.initialize(
+ new RocketmqKafkaConnectorContext(getConnectorContext())
+ );
+ } finally {
+ Plugins.compareAndSwapLoaders(savedLoader);
+ }
+
+ }
+
+ private void runWithWithConnectorLoader(Runnable runnable){
+ ClassLoader current =
this.kafkaPlugins.compareAndSwapLoaders(this.kafkaConnector);
+ try {
+ runnable.run();
+ } finally {
+ Plugins.compareAndSwapLoaders(current);
+ }
+ }
+}
diff --git
a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkConnector.java
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkConnector.java
new file mode 100644
index 0000000..9cec38e
--- /dev/null
+++
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkConnector.java
@@ -0,0 +1,43 @@
+package org.apache.rocketmq.connect.kafka.connector;
+
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class KafkaRocketmqSinkConnector extends SinkConnector {
+
+ private static final Logger log =
LoggerFactory.getLogger(KafkaRocketmqSinkConnector.class);
+
+ private KafkaRocketmqConnector kafkaRocketmqConnector = new
KafkaRocketmqConnector(this);
+
+ @Override
+ public List<KeyValue> taskConfigs(int maxTasks) {
+ return kafkaRocketmqConnector.taskConfigs(maxTasks);
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return kafkaRocketmqConnector.taskClass();
+ }
+
+ @Override
+ public void start(KeyValue config) {
+ kafkaRocketmqConnector.start(config);
+ }
+
+ @Override
+ public void stop() {
+ kafkaRocketmqConnector.stop();
+ }
+
+ @Override
+ public void validate(KeyValue config) {
+ kafkaRocketmqConnector.validate(config);
+ }
+
+}
diff --git
a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkTask.java
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkTask.java
new file mode 100644
index 0000000..c2b6cfa
--- /dev/null
+++
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkTask.java
@@ -0,0 +1,199 @@
+package org.apache.rocketmq.connect.kafka.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.errors.ConnectException;
+import io.openmessaging.connector.api.errors.RetriableException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+import org.apache.rocketmq.connect.kafka.config.ConfigDefine;
+import org.apache.rocketmq.connect.kafka.util.ConfigUtil;
+import org.apache.rocketmq.connect.kafka.util.KafkaPluginsUtil;
+import org.apache.rocketmq.connect.kafka.util.RecordUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+
+
+public class KafkaRocketmqSinkTask extends SinkTask {
+ private static final Logger log =
LoggerFactory.getLogger(KafkaRocketmqSinkTask.class);
+
+ private org.apache.kafka.connect.sink.SinkTask kafkaSinkTask;
+ private ClassLoader classLoader;
+
+ private Converter keyConverter;
+ private Converter valueConverter;
+ private HeaderConverter headerConverter;
+
+ @Override
+ public void put(List<ConnectRecord> sinkRecords) throws ConnectException {
+ Collection<SinkRecord> records = new ArrayList<>(sinkRecords.size());
+ for(ConnectRecord sinkRecord: sinkRecords){
+ String topic =
(String)sinkRecord.getPosition().getPartition().getPartition().get(RecordUtil.TOPIC);
+ SchemaAndValue valueSchemaAndValue =
valueConverter.toConnectData(topic,
((String)sinkRecord.getData()).getBytes(StandardCharsets.UTF_8));
+ String key = sinkRecord.getExtension(RecordUtil.KAFKA_MSG_KEY);
+ SchemaAndValue keySchemaAndValue = null;
+ if(key != null) {
+ keySchemaAndValue = keyConverter.toConnectData(topic,
key.getBytes(StandardCharsets.UTF_8));
+ }
+
+ SinkRecord record = new SinkRecord(
+
RecordUtil.getTopicAndBrokerName(sinkRecord.getPosition().getPartition()),
+
RecordUtil.getPartition(sinkRecord.getPosition().getPartition()),
+ keySchemaAndValue==null?null:keySchemaAndValue.schema(),
+ keySchemaAndValue==null?null:keySchemaAndValue.value(),
+ valueSchemaAndValue.schema(), valueSchemaAndValue.value(),
+ RecordUtil.getOffset(sinkRecord.getPosition().getOffset()),
+ sinkRecord.getTimestamp(), TimestampType.NO_TIMESTAMP_TYPE,
+ getHeaders(sinkRecord.getExtensions(), topic)
+ );
+ records.add(record);
+ }
+ try {
+ this.kafkaSinkTask.put(records);
+ } catch (org.apache.kafka.connect.errors.RetriableException e){
+ throw new RetriableException(e);
+ }
+ }
+
+ private ConnectHeaders getHeaders(KeyValue extensions, String topic){
+ ConnectHeaders headers = new ConnectHeaders();
+ for(String headerKey: extensions.keySet()){
+ if(RecordUtil.KAFKA_MSG_KEY.equals(headerKey)){
+ continue;
+ }
+ SchemaAndValue headerSchemaAndValue = headerConverter
+ .toConnectHeader(topic, headerKey,
extensions.getString(headerKey).getBytes());
+ headers.add(headerKey, headerSchemaAndValue);
+ }
+ return headers;
+ }
+
+
+
+ @Override
+ public void start(KeyValue config) {
+ Map<String, String> kafkaTaskProps =
ConfigUtil.keyValueConfigToMap(config);
+ log.info("kafka connector task config is {}", kafkaTaskProps);
+ Plugins kafkaPlugins =
KafkaPluginsUtil.getPlugins(Collections.singletonMap(KafkaPluginsUtil.PLUGIN_PATH,
kafkaTaskProps.get(ConfigDefine.PLUGIN_PATH)));
+ String connectorClass =
kafkaTaskProps.get(ConfigDefine.CONNECTOR_CLASS);
+ ClassLoader connectorLoader =
kafkaPlugins.delegatingLoader().connectorLoader(connectorClass);
+ this.classLoader = Plugins.compareAndSwapLoaders(connectorLoader);
+ try {
+ TaskConfig taskConfig = new TaskConfig(kafkaTaskProps);
+ Class<? extends Task> taskClass =
taskConfig.getClass(ConfigDefine.TASK_CLASS).asSubclass(Task.class);
+ this.kafkaSinkTask =
(org.apache.kafka.connect.sink.SinkTask)kafkaPlugins.newTask(taskClass);
+ initConverter(kafkaPlugins, kafkaTaskProps);
+ this.kafkaSinkTask.initialize(new
RocketmqKafkaSinkTaskContext(sinkTaskContext));
+ this.kafkaSinkTask.start(kafkaTaskProps);
+ } catch (Throwable e){
+ recoverClassLoader();
+ throw e;
+ }
+ }
+
+ @Override
+ public void stop() {
+ try {
+ this.kafkaSinkTask.stop();
+ } finally {
+ recoverClassLoader();
+ }
+ }
+
+
+ private void recoverClassLoader(){
+ if(this.classLoader != null){
+ Plugins.compareAndSwapLoaders(this.classLoader);
+ this.classLoader = null;
+ }
+ }
+
+ private void initConverter(Plugins plugins, Map<String, String> taskProps){
+
+ ConfigDef converterConfigDef = new ConfigDef()
+ .define(ConfigDefine.KEY_CONVERTER, ConfigDef.Type.CLASS,
null, ConfigDef.Importance.LOW, "")
+ .define(ConfigDefine.VALUE_CONVERTER, ConfigDef.Type.CLASS,
null, ConfigDef.Importance.LOW, "")
+ .define(ConfigDefine.HEADER_CONVERTER, ConfigDef.Type.CLASS,
null, ConfigDef.Importance.LOW, "");
+
+ Map<String, String> connProps = new HashMap<>();
+ if(taskProps.containsKey(ConfigDefine.KEY_CONVERTER)){
+ connProps.put(ConfigDefine.KEY_CONVERTER,
taskProps.get(ConfigDefine.KEY_CONVERTER));
+ }
+ if(taskProps.containsKey(ConfigDefine.VALUE_CONVERTER)){
+ connProps.put(ConfigDefine.VALUE_CONVERTER,
taskProps.get(ConfigDefine.VALUE_CONVERTER));
+ }
+ if(taskProps.containsKey(ConfigDefine.HEADER_CONVERTER)){
+ connProps.put(ConfigDefine.HEADER_CONVERTER,
taskProps.get(ConfigDefine.HEADER_CONVERTER));
+ }
+ SimpleConfig connConfig = new SimpleConfig(converterConfigDef,
connProps);
+
+ Map<String, String> workerProps = new HashMap<>();
+ workerProps.put(ConfigDefine.KEY_CONVERTER,
"org.apache.kafka.connect.json.JsonConverter");
+ workerProps.put(ConfigDefine.VALUE_CONVERTER,
"org.apache.kafka.connect.json.JsonConverter");
+ workerProps.put(ConfigDefine.HEADER_CONVERTER,
"org.apache.kafka.connect.storage.SimpleHeaderConverter");
+ SimpleConfig workerConfig = new SimpleConfig(converterConfigDef,
workerProps);
+
+ keyConverter = plugins.newConverter(connConfig,
ConfigDefine.KEY_CONVERTER, Plugins.ClassLoaderUsage
+ .CURRENT_CLASSLOADER);
+ valueConverter = plugins.newConverter(connConfig,
ConfigDefine.VALUE_CONVERTER, Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER);
+ headerConverter = plugins.newHeaderConverter(connConfig,
ConfigDefine.HEADER_CONVERTER,
+ Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER);
+
+ if (keyConverter == null) {
+ keyConverter = plugins.newConverter(workerConfig,
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage.PLUGINS);
+ log.info("Set up the key converter {} for task {} using the worker
config", keyConverter.getClass(), sinkTaskContext.getTaskName());
+ } else {
+ log.info("Set up the key converter {} for task {} using the
connector config", keyConverter.getClass(), sinkTaskContext.getTaskName());
+ }
+ if (valueConverter == null) {
+ valueConverter = plugins.newConverter(workerConfig,
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage.PLUGINS);
+ log.info("Set up the value converter {} for task {} using the
worker config", valueConverter.getClass(), sinkTaskContext.getTaskName());
+ } else {
+ log.info("Set up the value converter {} for task {} using the
connector config", valueConverter.getClass(), sinkTaskContext.getTaskName());
+ }
+ if (headerConverter == null) {
+ headerConverter = plugins.newHeaderConverter(workerConfig,
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage
+ .PLUGINS);
+ log.info("Set up the header converter {} for task {} using the
worker config", headerConverter.getClass(), sinkTaskContext.getTaskName());
+ } else {
+ log.info("Set up the header converter {} for task {} using the
connector config", headerConverter.getClass(), sinkTaskContext.getTaskName());
+ }
+ }
+
+ @Override
+ public void flush(Map<RecordPartition, RecordOffset> currentOffsets)
throws ConnectException {
+
+ if(this.kafkaSinkTask == null){
+ log.warn("the task is not start, currentOffsets:{}",
currentOffsets);
+ return;
+ }
+
+ Map<TopicPartition, OffsetAndMetadata> offsets = new
HashMap<>(currentOffsets.size());
+
+ for(Map.Entry<RecordPartition, RecordOffset> po:
currentOffsets.entrySet()){
+ TopicPartition tp =
RecordUtil.recordPartitionToTopicPartition(po.getKey());
+ OffsetAndMetadata offsetAndMetadata = new
OffsetAndMetadata(RecordUtil.getOffset(po.getValue()));
+ offsets.put(tp, offsetAndMetadata);
+ }
+ this.kafkaSinkTask.flush(offsets);
+ }
+}
diff --git
a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceConnector.java
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceConnector.java
new file mode 100644
index 0000000..6f6e006
--- /dev/null
+++
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceConnector.java
@@ -0,0 +1,43 @@
+package org.apache.rocketmq.connect.kafka.connector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+
+import java.util.List;
+
+
+public class KafkaRocketmqSourceConnector extends SourceConnector {
+
+ private static final Logger log =
LoggerFactory.getLogger(KafkaRocketmqSourceConnector.class);
+
+ private KafkaRocketmqConnector kafkaRocketmqConnector = new
KafkaRocketmqConnector(this);
+
+ @Override
+ public List<KeyValue> taskConfigs(int maxTasks) {
+ return kafkaRocketmqConnector.taskConfigs(maxTasks);
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return kafkaRocketmqConnector.taskClass();
+ }
+
+ @Override
+ public void start(KeyValue config) {
+ kafkaRocketmqConnector.start(config);
+ }
+
+ @Override
+ public void stop() {
+ kafkaRocketmqConnector.stop();
+ }
+
+ @Override
+ public void validate(KeyValue config) {
+ kafkaRocketmqConnector.validate(config);
+ }
+
+}
diff --git
a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceTask.java
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceTask.java
new file mode 100644
index 0000000..3db8251
--- /dev/null
+++
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceTask.java
@@ -0,0 +1,185 @@
+package org.apache.rocketmq.connect.kafka.connector;
+
+import io.openmessaging.connector.api.data.*;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+import org.apache.rocketmq.connect.kafka.util.KafkaPluginsUtil;
+import org.apache.rocketmq.connect.kafka.util.RecordUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.rocketmq.connect.kafka.config.ConfigDefine;
+import org.apache.rocketmq.connect.kafka.util.ConfigUtil;
+
+import java.util.*;
+
+public class KafkaRocketmqSourceTask extends SourceTask {
+ private static final Logger log =
LoggerFactory.getLogger(KafkaRocketmqSourceTask.class);
+
+ private org.apache.kafka.connect.source.SourceTask kafkaSourceTask;
+
+ private ClassLoader classLoader;
+
+ private Converter keyConverter;
+ private Converter valueConverter;
+ private HeaderConverter headerConverter;
+
+
+ @Override
+ public List<ConnectRecord> poll() throws InterruptedException {
+
+ List<SourceRecord> sourceRecords = this.kafkaSourceTask.poll();
+
+ if(sourceRecords == null){
+ return null;
+ }
+
+ List<ConnectRecord> connectRecords = new
ArrayList<>(sourceRecords.size());
+ for(SourceRecord sourceRecord: sourceRecords){
+ connectRecords.add(RecordUtil.toConnectRecord(sourceRecord,
+ this.keyConverter, this.valueConverter,
this.headerConverter));
+ }
+ return connectRecords;
+ }
+
+ @Override
+ public void start(KeyValue config) {
+ Map<String, String> kafkaTaskProps =
ConfigUtil.keyValueConfigToMap(config);
+ log.info("kafka connector task config is {}", kafkaTaskProps);
+ Plugins kafkaPlugins =
KafkaPluginsUtil.getPlugins(Collections.singletonMap(KafkaPluginsUtil.PLUGIN_PATH,
kafkaTaskProps.get(ConfigDefine.PLUGIN_PATH)));
+ String connectorClass =
kafkaTaskProps.get(ConfigDefine.CONNECTOR_CLASS);
+ ClassLoader connectorLoader =
kafkaPlugins.delegatingLoader().connectorLoader(connectorClass);
+ this.classLoader = Plugins.compareAndSwapLoaders(connectorLoader);
+ try {
+
+ TaskConfig taskConfig = new TaskConfig(kafkaTaskProps);
+ Class<? extends Task> taskClass =
taskConfig.getClass(ConfigDefine.TASK_CLASS).asSubclass(Task.class);
+ this.kafkaSourceTask =
(org.apache.kafka.connect.source.SourceTask)kafkaPlugins.newTask(taskClass);
+
+ initConverter(kafkaPlugins, kafkaTaskProps);
+
+ this.kafkaSourceTask.initialize(new
RocketmqKafkaSourceTaskContext(sourceTaskContext));
+ this.kafkaSourceTask.start(kafkaTaskProps);
+ } catch (Throwable e){
+ recoverClassLoader();
+ throw e;
+ }
+ }
+
+ private void initConverter(Plugins plugins, Map<String, String> taskProps){
+
+ ConfigDef converterConfigDef = new ConfigDef()
+ .define(ConfigDefine.KEY_CONVERTER, ConfigDef.Type.CLASS,
null, ConfigDef.Importance.LOW, "")
+ .define(ConfigDefine.VALUE_CONVERTER, ConfigDef.Type.CLASS,
null, ConfigDef.Importance.LOW, "")
+ .define(ConfigDefine.HEADER_CONVERTER, ConfigDef.Type.CLASS,
null, ConfigDef.Importance.LOW, "");
+
+ Map<String, String> connProps = new HashMap<>();
+ if(taskProps.containsKey(ConfigDefine.KEY_CONVERTER)){
+ connProps.put(ConfigDefine.KEY_CONVERTER,
taskProps.get(ConfigDefine.KEY_CONVERTER));
+ }
+ if(taskProps.containsKey(ConfigDefine.VALUE_CONVERTER)){
+ connProps.put(ConfigDefine.VALUE_CONVERTER,
taskProps.get(ConfigDefine.VALUE_CONVERTER));
+ }
+ if(taskProps.containsKey(ConfigDefine.HEADER_CONVERTER)){
+ connProps.put(ConfigDefine.HEADER_CONVERTER,
taskProps.get(ConfigDefine.HEADER_CONVERTER));
+ }
+ SimpleConfig connConfig = new SimpleConfig(converterConfigDef,
connProps);
+
+ Map<String, String> workerProps = new HashMap<>();
+ workerProps.put(ConfigDefine.KEY_CONVERTER,
"org.apache.kafka.connect.json.JsonConverter");
+ workerProps.put(ConfigDefine.VALUE_CONVERTER,
"org.apache.kafka.connect.json.JsonConverter");
+ workerProps.put(ConfigDefine.HEADER_CONVERTER,
"org.apache.kafka.connect.storage.SimpleHeaderConverter");
+ SimpleConfig workerConfig = new SimpleConfig(converterConfigDef,
workerProps);
+
+ keyConverter = plugins.newConverter(connConfig,
ConfigDefine.KEY_CONVERTER, Plugins.ClassLoaderUsage
+ .CURRENT_CLASSLOADER);
+ valueConverter = plugins.newConverter(connConfig,
ConfigDefine.VALUE_CONVERTER, Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER);
+ headerConverter = plugins.newHeaderConverter(connConfig,
ConfigDefine.HEADER_CONVERTER,
+ Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER);
+
+ if (keyConverter == null) {
+ keyConverter = plugins.newConverter(workerConfig,
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage.PLUGINS);
+ log.info("Set up the key converter {} for task {} using the worker
config", keyConverter.getClass(), sourceTaskContext.getTaskName());
+ } else {
+ log.info("Set up the key converter {} for task {} using the
connector config", keyConverter.getClass(), sourceTaskContext.getTaskName());
+ }
+ if (valueConverter == null) {
+ valueConverter = plugins.newConverter(workerConfig,
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage.PLUGINS);
+ log.info("Set up the value converter {} for task {} using the
worker config", valueConverter.getClass(), sourceTaskContext.getTaskName());
+ } else {
+ log.info("Set up the value converter {} for task {} using the
connector config", valueConverter.getClass(), sourceTaskContext.getTaskName());
+ }
+ if (headerConverter == null) {
+ headerConverter = plugins.newHeaderConverter(workerConfig,
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage
+ .PLUGINS);
+ log.info("Set up the header converter {} for task {} using the
worker config", headerConverter.getClass(), sourceTaskContext.getTaskName());
+ } else {
+ log.info("Set up the header converter {} for task {} using the
connector config", headerConverter.getClass(), sourceTaskContext.getTaskName());
+ }
+ }
+
+ @Override
+ public void stop() {
+ try {
+ this.kafkaSourceTask.stop();
+ } finally {
+ recoverClassLoader();
+ }
+ }
+
+ private void recoverClassLoader(){
+ if(this.classLoader != null){
+ Plugins.compareAndSwapLoaders(this.classLoader);
+ this.classLoader = null;
+ }
+ }
+
+
+ @Override
+ public void commit(ConnectRecord record, Map<String, String> metadata) {
+
+ if(this.kafkaSourceTask == null){
+ log.warn("the task is not start, metadata:{}", metadata);
+ return;
+ }
+
+ try {
+ long baseOffset =
Long.valueOf(metadata.get(RecordUtil.QUEUE_OFFSET));
+ TopicPartition topicPartition = new
TopicPartition(metadata.get(RecordUtil.TOPIC),
Integer.valueOf(metadata.get(RecordUtil.QUEUE_ID)));
+ RecordMetadata recordMetadata = new RecordMetadata(
+ topicPartition, baseOffset, 0,
+ System.currentTimeMillis(), 0,0
+ );
+ this.kafkaSourceTask.commitRecord(
+ RecordUtil.toSourceRecord(record, this.keyConverter,
this.valueConverter, this.headerConverter),
+ recordMetadata
+ );
+ } catch (InterruptedException e){
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public void commit() {
+ if(this.kafkaSourceTask == null){
+ log.warn("the task is not start");
+ return;
+ }
+
+ try {
+ this.kafkaSourceTask.commit();
+ } catch (InterruptedException e){
+ Thread.currentThread().interrupt();
+ }
+ }
+}
diff --git
a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaConnectorContext.java
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaConnectorContext.java
new file mode 100644
index 0000000..ea20b81
--- /dev/null
+++
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaConnectorContext.java
@@ -0,0 +1,21 @@
+package org.apache.rocketmq.connect.kafka.connector;
+
+import io.openmessaging.connector.api.component.connector.ConnectorContext;
+
+public class RocketmqKafkaConnectorContext implements
org.apache.kafka.connect.connector.ConnectorContext{
+ protected ConnectorContext rocketMqConnectorContext;
+
+ public RocketmqKafkaConnectorContext(ConnectorContext
rocketMqConnectorContext) {
+ this.rocketMqConnectorContext = rocketMqConnectorContext;
+ }
+
+ @Override
+ public void requestTaskReconfiguration() {
+ this.rocketMqConnectorContext.requestTaskReconfiguration();
+ }
+
+ @Override
+ public void raiseError(Exception e) {
+ this.rocketMqConnectorContext.raiseError(e);
+ }
+}
diff --git
a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaSinkTaskContext.java
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaSinkTaskContext.java
new file mode 100644
index 0000000..848032b
--- /dev/null
+++
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaSinkTaskContext.java
@@ -0,0 +1,128 @@
+package org.apache.rocketmq.connect.kafka.connector;
+
+
+import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
+import io.openmessaging.connector.api.data.*;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.rocketmq.connect.kafka.util.ConfigUtil;
+import org.apache.rocketmq.connect.kafka.util.RecordUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.stream.Collectors;
+
+
+public class RocketmqKafkaSinkTaskContext implements
org.apache.kafka.connect.sink.SinkTaskContext {
+
+ private static final Logger log =
LoggerFactory.getLogger(RocketmqKafkaSinkTaskContext.class);
+
+ private static final ExecutorService EXECUTOR_SERVICE =
Executors.newFixedThreadPool(1);
+
+ private SinkTaskContext sinkTaskContext;
+
+ public RocketmqKafkaSinkTaskContext(SinkTaskContext sinkTaskContext) {
+ this.sinkTaskContext = sinkTaskContext;
+ }
+
+ @Override
+ public Map<String, String> configs() {
+ return ConfigUtil.keyValueConfigToMap(sinkTaskContext.configs());
+ }
+
+ @Override
+ public void offset(Map<TopicPartition, Long> offsets) {
+
+ Map<RecordPartition, RecordOffset> offsets2 = new
HashMap<>(offsets.size());
+ offsets.forEach((tp,offset) -> {
+ Map<String, String> map = RecordUtil.getPartitionMap(tp.topic());
+ map.put(RecordUtil.QUEUE_ID, tp.partition() + "");
+ RecordPartition recordPartition = new RecordPartition(map);
+
+ Map<String, String> offsetMap = new HashMap<>();
+ offsetMap.put(RecordUtil.QUEUE_OFFSET, offset + "");
+ RecordOffset recordOffset = new RecordOffset(offsetMap);
+
+ offsets2.put(recordPartition, recordOffset);
+ });
+ sinkTaskContext.resetOffset(offsets2);
+ }
+
+ @Override
+ public void offset(TopicPartition tp, long offset) {
+ this.offset(Collections.singletonMap(tp, offset));
+ }
+
+ @Override
+ public void timeout(long timeoutMs) {
+ log.info("ignore timeout because not impl, timeoutMs:{}", timeoutMs);
+ }
+
+ @Override
+ public Set<TopicPartition> assignment() {
+ return sinkTaskContext.assignment()
+ .stream()
+ .map(RecordUtil::recordPartitionToTopicPartition)
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public void pause(TopicPartition... partitions) {
+ sinkTaskContext.pause(
+ toRecordPartitions(partitions)
+ );
+ }
+
+ @Override
+ public void resume(TopicPartition... partitions) {
+ sinkTaskContext.resume(
+ toRecordPartitions(partitions)
+ );
+ }
+
+ private List<RecordPartition> toRecordPartitions(TopicPartition...
partitions){
+ return Arrays.stream(partitions)
+ .map(RecordUtil::topicPartitionToRecordPartition)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public void requestCommit() {
+ log.info("ignore requestCommit because not impl");
+ }
+
+
+ @Override
+ public ErrantRecordReporter errantRecordReporter() {
+ return new ErrantRecordReporter() {
+ @Override
+ public Future<Void> report(SinkRecord record, Throwable error) {
+
+ return EXECUTOR_SERVICE.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+
+ Map<String, String> partitionMap =
RecordUtil.getPartitionMap(record.topic());
+ partitionMap.put(RecordUtil.QUEUE_ID,
record.kafkaPartition() + "");
+ RecordPartition recordPartition = new
RecordPartition(partitionMap);
+
+ Map<String, String> offsetMap = new HashMap<>();
+ offsetMap.put(RecordUtil.QUEUE_OFFSET,
record.kafkaOffset() + "");
+ RecordOffset recordOffset = new
RecordOffset(offsetMap);
+
+ ConnectRecord connectRecord = new ConnectRecord(
+ recordPartition, recordOffset,
record.timestamp(),
+ SchemaBuilder.string().build(), record.value()
+ );
+
sinkTaskContext.errorRecordReporter().report(connectRecord, error);
+ return null;
+ }
+ });
+
+ }
+ };
+ }
+}
diff --git
a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaSourceTaskContext.java
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaSourceTaskContext.java
new file mode 100644
index 0000000..c30294a
--- /dev/null
+++
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaSourceTaskContext.java
@@ -0,0 +1,64 @@
+package org.apache.rocketmq.connect.kafka.connector;
+
+
+import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
+import io.openmessaging.connector.api.data.RecordPartition;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
+import org.apache.rocketmq.connect.kafka.util.ConfigUtil;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class RocketmqKafkaSourceTaskContext implements
org.apache.kafka.connect.source.SourceTaskContext {
+
+ private SourceTaskContext sourceTaskContext;
+
+ public RocketmqKafkaSourceTaskContext(SourceTaskContext sourceTaskContext)
{
+ this.sourceTaskContext = sourceTaskContext;
+ }
+
+ @Override
+ public Map<String, String> configs() {
+ return ConfigUtil.keyValueConfigToMap(sourceTaskContext.configs());
+ }
+
+ @Override
+ public OffsetStorageReader offsetStorageReader() {
+ return new OffsetStorageReader(){
+ @Override
+ public <T> Map<String, Object> offset(Map<String, T> partition) {
+ return
offsets(Collections.singletonList(partition)).get(partition);
+ }
+
+ @Override
+ public <T> Map<Map<String, T>, Map<String, Object>>
offsets(Collection<Map<String, T>> partitions) {
+
+ Collection<RecordPartition> rocketmqPartitions =
partitions.stream()
+ .map(RecordPartition::new)
+ .collect(Collectors.toList());
+
+ Map<Map<String, T>, Map<String, Object>> results = new
HashMap<>(partitions.size());
+ sourceTaskContext
+ .offsetStorageReader()
+ .readOffsets(rocketmqPartitions)
+ .forEach((p,o) -> {
+ results.put((Map<String, T>)p.getPartition(),
mayConvertToLongOffset(o.getOffset()));
+ });
+ return results;
+ }
+
+ // kafka的offset是long表示,被序列化再反序列化会是int
+ private Map<String, Object> mayConvertToLongOffset(Map<String, ?>
offset){
+ Map<String, Object> result = new HashMap<>(offset.size());
+ for(Map.Entry<String, ?> kv: offset.entrySet()){
+ Object v = kv.getValue();
+ result.put(kv.getKey(), v instanceof Integer ? ((Integer)
v).longValue():v);
+ }
+ return result;
+ }
+ };
+ }
+}
diff --git
a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/ConfigUtil.java
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/ConfigUtil.java
new file mode 100644
index 0000000..2113fcb
--- /dev/null
+++
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/ConfigUtil.java
@@ -0,0 +1,34 @@
+package org.apache.rocketmq.connect.kafka.util;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class ConfigUtil {
+
+ public static Map<String, String> keyValueConfigToMap(KeyValue
keyValueConfig){
+ if(keyValueConfig == null){
+ return null;
+ }
+
+ Set<String> configKeySet = keyValueConfig.keySet();
+ Map<String, String> mapConfig = new HashMap<>(configKeySet.size());
+ configKeySet.forEach(key -> mapConfig.put(key,
keyValueConfig.getString(key)));
+ return mapConfig;
+ }
+
+
+ public static KeyValue mapConfigToKeyValue(Map<String, String> mapConfig){
+ if(mapConfig == null){
+ return null;
+ }
+
+ KeyValue keyValue = new DefaultKeyValue();
+ mapConfig.forEach((k, v)-> keyValue.put(k, v));
+
+ return keyValue;
+ }
+}
diff --git
a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/KafkaPluginsUtil.java
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/KafkaPluginsUtil.java
new file mode 100644
index 0000000..65a324d
--- /dev/null
+++
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/KafkaPluginsUtil.java
@@ -0,0 +1,25 @@
+package org.apache.rocketmq.connect.kafka.util;
+
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class KafkaPluginsUtil {
+
+
+ public static final String PLUGIN_PATH = "plugin.path";
+ private static final Map<String, Plugins> CACHE = new HashMap<>();
+
+ public static Plugins getPlugins(Map<String, String> props){
+ String path = props.get(PLUGIN_PATH);
+ synchronized (CACHE){
+ Plugins plugins = CACHE.get(path);
+ if(plugins == null){
+ plugins = new Plugins(props);
+ CACHE.put(path, plugins);
+ }
+ return plugins;
+ }
+ }
+}
diff --git
a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/RecordUtil.java
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/RecordUtil.java
new file mode 100644
index 0000000..815ae56
--- /dev/null
+++
b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/RecordUtil.java
@@ -0,0 +1,157 @@
+package org.apache.rocketmq.connect.kafka.util;
+
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+public class RecordUtil {
+
+ public static final String BROKER_NAME = "brokerName";
+ public static final String QUEUE_ID = "queueId";
+ public static final String TOPIC = "topic";
+ public static final String QUEUE_OFFSET = "queueOffset";
+
+
+ private static final String TOPIC_SEP = "@#@";
+
+
+ public static final String KAFKA_MSG_KEY = "kafka_key";
+ public static final String KAFKA_CONNECT_RECORD_TOPIC_KEY =
"kafka_connect_record_topic";
+ public static final String KAFKA_CONNECT_RECORD_PARTITION_KEY =
"kafka_connect_record_partition";
+ public static final String KAFKA_CONNECT_RECORD_HEADER_KEY_PREFIX =
"kafka_connect_record_header_";
+
+ public static String getTopicAndBrokerName(RecordPartition
recordPartition) {
+ return new StringBuilder()
+ .append(recordPartition.getPartition().get(TOPIC))
+ .append(TOPIC_SEP)
+ .append(recordPartition.getPartition().get(BROKER_NAME))
+ .toString();
+ }
+
+ public static Map<String, String> getPartitionMap(String
topicAndBrokerName) {
+ String[] split = topicAndBrokerName.split(TOPIC_SEP);
+ Map<String, String> map = new HashMap<>();
+ map.put(TOPIC, split[0]);
+ map.put(BROKER_NAME, split[1]);
+
+ return map;
+ }
+
+ public static long getOffset(RecordOffset recordOffset){
+ return Long.valueOf(
+ (String) recordOffset.getOffset().get(QUEUE_OFFSET)
+ );
+ }
+
+ public static int getPartition(RecordPartition recordPartition){
+ return Integer.valueOf(
+ (String) recordPartition.getPartition().get(QUEUE_ID)
+ );
+ }
+
+ public static TopicPartition
recordPartitionToTopicPartition(RecordPartition recordPartition){
+ String topicAndBrokerName = getTopicAndBrokerName(recordPartition);
+ int partition = getPartition(recordPartition);
+ return new TopicPartition(topicAndBrokerName, partition);
+ }
+
+ public static RecordPartition
topicPartitionToRecordPartition(TopicPartition topicPartition){
+ Map<String, String> map =
RecordUtil.getPartitionMap(topicPartition.topic());
+ map.put(RecordUtil.QUEUE_ID, topicPartition.partition() + "");
+ return new RecordPartition(map);
+ }
+
+
+ public static ConnectRecord toConnectRecord(SourceRecord sourceRecord,
Converter keyConverter, Converter valueConverter,
+ HeaderConverter headerConverter){
+ RecordPartition recordPartition = new RecordPartition(new
HashMap<>(sourceRecord.sourcePartition()));
+ RecordOffset recordOffset = new RecordOffset(new
HashMap<>(sourceRecord.sourceOffset()));
+ Long timestamp = sourceRecord.timestamp();
+
+ byte[] value = valueConverter.fromConnectData(
+ sourceRecord.topic(), sourceRecord.valueSchema(),
sourceRecord.value()
+ );
+
+ ConnectRecord connectRecord = new ConnectRecord(
+ recordPartition, recordOffset, timestamp,
+ SchemaBuilder.string().build(), new String(value,
StandardCharsets.UTF_8)
+ );
+
+ if(sourceRecord.key() != null) {
+ byte[] key = keyConverter.fromConnectData
+ (sourceRecord.topic(), sourceRecord.keySchema(),
sourceRecord.key()
+ );
+ connectRecord.addExtension(RecordUtil.KAFKA_MSG_KEY, new
String(key, StandardCharsets.UTF_8));
+ }
+
+ for(Header header: sourceRecord.headers()){
+ byte[] headerValue = headerConverter.fromConnectHeader(
+ sourceRecord.topic(), header.key(), header.schema(),
header.value()
+ );
+
connectRecord.addExtension(RecordUtil.KAFKA_CONNECT_RECORD_HEADER_KEY_PREFIX+header.key(),
new String(headerValue, StandardCharsets.UTF_8));
+ }
+
+ if(sourceRecord.topic() != null){
+
connectRecord.addExtension(RecordUtil.KAFKA_CONNECT_RECORD_TOPIC_KEY,
sourceRecord.topic());
+ }
+ if(sourceRecord.kafkaPartition() != null){
+
connectRecord.addExtension(RecordUtil.KAFKA_CONNECT_RECORD_PARTITION_KEY,
sourceRecord.kafkaPartition().toString());
+ }
+ return connectRecord;
+ }
+
+
+ public static SourceRecord toSourceRecord(ConnectRecord connectRecord,
Converter keyConverter, Converter valueConverter,
+ HeaderConverter
headerConverter){
+ Map<String, ?> sourcePartition = new
HashMap<>(connectRecord.getPosition().getPartition().getPartition());
+ Map<String, ?> sourceOffset = new
HashMap<>(connectRecord.getPosition().getOffset().getOffset());
+ String topic =
connectRecord.getExtension(RecordUtil.KAFKA_CONNECT_RECORD_TOPIC_KEY);
+ String partitionStr =
connectRecord.getExtension(RecordUtil.KAFKA_CONNECT_RECORD_PARTITION_KEY);
+ Integer partition = null;
+ if(partitionStr != null){
+ partition = Integer.valueOf(partitionStr);
+ }
+ String keyStr = connectRecord.getExtension(RecordUtil.KAFKA_MSG_KEY);
+ Schema keySchema = null;
+ Object key = null;
+ if(keyStr != null){
+ SchemaAndValue keySchemaAndValue =
keyConverter.toConnectData(topic, keyStr.getBytes(StandardCharsets.UTF_8));
+ keySchema = keySchemaAndValue.schema();
+ key = keySchemaAndValue.value();
+ }
+
+ ConnectHeaders headers = new ConnectHeaders();
+ for(String extKey: connectRecord.getExtensions().keySet()){
+
if(!extKey.startsWith(RecordUtil.KAFKA_CONNECT_RECORD_HEADER_KEY_PREFIX)){
+ continue;
+ }
+ String header =
extKey.substring(RecordUtil.KAFKA_CONNECT_RECORD_HEADER_KEY_PREFIX.length());
+ SchemaAndValue headerSchemaAndValue = headerConverter
+ .toConnectHeader(topic, header,
connectRecord.getExtension(extKey).getBytes(StandardCharsets.UTF_8));
+
+ headers.add(header, headerSchemaAndValue);
+ }
+
+ SchemaAndValue valueSchemaAndValue = keyConverter.toConnectData(topic,
((String)connectRecord.getData()).getBytes(StandardCharsets.UTF_8));
+ SourceRecord sourceRecord = new SourceRecord(
+ sourcePartition, sourceOffset, topic, partition,
+ keySchema, key, valueSchemaAndValue.schema(),
valueSchemaAndValue.value(),
+ connectRecord.getTimestamp(),headers
+ );
+ return sourceRecord;
+ }
+
+}