This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 8dcf2b37 [ISSUE #523 & OSPP2023] Feature: support archetype to create
connectors (#525)
8dcf2b37 is described below
commit 8dcf2b37f247e2261512df8e00df7f875d4190d0
Author: Ao Qiao <[email protected]>
AuthorDate: Tue Oct 17 18:05:20 2023 +0800
[ISSUE #523 & OSPP2023] Feature: support archetype to create connectors
(#525)
* initial commit
* commit
* rename
* finish source connector
* finish sink connector
* add license
* Update README.md
* Update connect-standalone.conf
* support archetype
* Delete README.md
---
.../README.md | 36 ++++
.../rocketmq-connect-connectors-archetype/pom.xml | 37 ++++
.../META-INF/maven/archetype-metadata.xml | 66 ++++++++
.../main/resources/archetype-resources/README.md | 55 ++++++
.../src/main/resources/archetype-resources/pom.xml | 188 +++++++++++++++++++++
.../java/config/__dbNameToCamel__BaseConfig.java | 131 ++++++++++++++
.../java/config/__dbNameToCamel__Constants.java | 50 ++++++
.../java/config/__dbNameToCamel__SinkConfig.java | 34 ++++
.../java/config/__dbNameToCamel__SourceConfig.java | 45 +++++
.../java/helper/__dbNameToCamel__HelperClient.java | 72 ++++++++
.../main/java/helper/__dbNameToCamel__Record.java | 25 +++
.../java/sink/__dbNameToCamel__SinkConnector.java | 60 +++++++
.../main/java/sink/__dbNameToCamel__SinkTask.java | 69 ++++++++
.../source/__dbNameToCamel__SourceConnector.java | 60 +++++++
.../java/source/__dbNameToCamel__SourceTask.java | 167 ++++++++++++++++++
.../java/sink/__dbNameToCamel__SinkTaskTest.java | 86 ++++++++++
.../source/__dbNameToCamel__SourceTaskTest.java | 28 +++
.../resources/projects/basic/archetype.properties | 11 ++
.../src/test/resources/projects/basic/goal.txt | 0
19 files changed, 1220 insertions(+)
diff --git a/connectors/rocketmq-connect-connectors-archetype/README.md
b/connectors/rocketmq-connect-connectors-archetype/README.md
new file mode 100644
index 00000000..e9fee6f0
--- /dev/null
+++ b/connectors/rocketmq-connect-connectors-archetype/README.md
@@ -0,0 +1,36 @@
+## How to Use Connnector-Archetype
+
+1. 进入脚手架文件夹
+
+ ```shell
+ cd rocketmq-connect-connectors-archetype/
+ ```
+
+2. 将脚手架安装到本地
+
+ ```shell
+ mvn -e clean install
+ ```
+
+3. 创建connector模版工程
+
+ ```shell
+ cd connectors/
+ mvn archetype:generate \
+ -DarchetypeGroupId=org.apache.rocketmq \
+ -DarchetypeArtifactId=rocketmq-connect-connectors-archetype \
+ -DarchetypeVersion=1.0-SNAPSHOT \
+ -DdatabaseName=<databasename>
+ ```
+
+ 例:创建Clickhouse-Connector
+
+ ```shell
+ mvn archetype:generate \
+ -DarchetypeGroupId=org.apache.rocketmq \
+ -DarchetypeArtifactId=rocketmq-connect-connectors-archetype \
+ -DarchetypeVersion=1.0-SNAPSHOT \
+ -DdatabaseName=clickhouse
+ ```
+
+4.
如上指令将创建一个connector的框架,开发者主要关心`helper/xxxHelperClient`以及`xxxxSourceTask`,`xxxSinkTask`的实现即可,剩余配置可以按需修改。
diff --git a/connectors/rocketmq-connect-connectors-archetype/pom.xml
b/connectors/rocketmq-connect-connectors-archetype/pom.xml
new file mode 100644
index 00000000..6081f655
--- /dev/null
+++ b/connectors/rocketmq-connect-connectors-archetype/pom.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-connect-connectors-archetype</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <packaging>maven-archetype</packaging>
+
+ <name>rocketmq-connect-connectors-archetype</name>
+
+ <build>
+ <extensions>
+ <extension>
+ <groupId>org.apache.maven.archetype</groupId>
+ <artifactId>archetype-packaging</artifactId>
+ <version>3.2.1</version>
+ </extension>
+ </extensions>
+
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <artifactId>maven-archetype-plugin</artifactId>
+ <version>3.2.1</version>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ </license>
+ </licenses>
+</project>
diff --git
a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/META-INF/maven/archetype-metadata.xml
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/META-INF/maven/archetype-metadata.xml
new file mode 100644
index 00000000..ba974e82
--- /dev/null
+++
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/META-INF/maven/archetype-metadata.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<archetype-descriptor
xsi:schemaLocation="https://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.1.0
http://maven.apache.org/xsd/archetype-descriptor-1.1.0.xsd"
name="rocketmq-connect-connectors-archetype"
+
xmlns="https://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.1.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+
+ <requiredProperties>
+ <!--必填属性-->
+ <requiredProperty key="groupId">
+ <defaultValue>org.apache.rocketmq</defaultValue>
+ </requiredProperty>
+ <requiredProperty key="artifactId">
+ <defaultValue>rocketmq-connect-${dbNameToLowerCase}</defaultValue>
+ </requiredProperty>
+ <requiredProperty key="version">
+ <defaultValue>1.0.0-SNAPSHOT</defaultValue>
+ </requiredProperty>
+ <requiredProperty key="package">
+
<defaultValue>org.apache.rocketmq.connect.${dbNameToLowerCase}</defaultValue>
+ </requiredProperty>
+
+ <!-- The database to connect -->
+ <requiredProperty key="databaseName"/>
+
+ <requiredProperty key="dbNameToUpperCase" >
+ <defaultValue>${databaseName.toUpperCase()}</defaultValue>
+ </requiredProperty>
+
+ <requiredProperty key="dbNameToCamel" >
+
<defaultValue>${databaseName.toLowerCase().substring(0,1).toUpperCase()}${databaseName.toLowerCase().substring(1)}</defaultValue>
+ </requiredProperty>
+
+ <requiredProperty key="dbNameToLowerCase" >
+ <defaultValue>${databaseName.toLowerCase()}</defaultValue>
+ </requiredProperty>
+ </requiredProperties>
+
+ <fileSets>
+ <fileSet filtered="true" encoding="UTF-8">
+ <directory/>
+ <includes>
+ <include>.reviewboardrc</include>
+ <include>README.md</include>
+ </includes>
+ </fileSet>
+ <fileSet encoding="UTF-8">
+ <directory/>
+ <includes>
+ <include>.gitignore</include>
+ <include>TODO.md</include>
+ </includes>
+ </fileSet>
+ <fileSet encoding="UTF-8">
+ <directory/>
+ <includes>
+ <include>.gitignore</include>
+ </includes>
+ </fileSet>
+ <fileSet filtered="true" packaged="true" encoding="UTF-8">
+ <directory>src/main/java</directory>
+ </fileSet>
+ <fileSet filtered="true" packaged="true" encoding="UTF-8">
+ <directory>src/test/java</directory>
+ </fileSet>
+ </fileSets>
+
+</archetype-descriptor>
diff --git
a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/README.md
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/README.md
new file mode 100644
index 00000000..2cd7f28b
--- /dev/null
+++
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/README.md
@@ -0,0 +1,55 @@
+## FIXME: fix document
+
+##### ${dbNameToCamel}SourceConnector fully-qualified name
+
+org.apache.rocketmq.connect.${dbNameToLowerCase}.source.${dbNameToCamel}SourceConnector
+
+**${dbNameToLowerCase}-source-connector** start
+
+```
+POST
http://${runtime-ip}:${runtime-port}/connectors/${dbNameToLowerCase}SourceConnector
+{
+
"connector.class":"org.apache.rocketmq.connect.${dbNameToLowerCase}.source.${dbNameToCamel}SourceConnector",
+ "${dbNameToLowerCase}host":"localhost",
+ "${dbNameToLowerCase}port":8123,
+ "database":"default",
+ "username":"default",
+ "password":"123456",
+ "table":"tableName",
+ "topic":"test${dbNameToCamel}Topic",
+
"value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
+
"key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
+}
+```
+
+##### ${dbNameToCamel}SinkConnector fully-qualified name
+
+org.apache.rocketmq.connect.${dbNameToLowerCase}.sink.${dbNameToCamel}SinkConnector
+
+**${dbNameToLowerCase}-sink-connector** start
+
+```
+POST
http://${runtime-ip}:${runtime-port}/connectors/${dbNameToLowerCase}SinkConnector
+{
+
"connector.class":"org.apache.rocketmq.connect.${dbNameToLowerCase}.sink.${dbNameToCamel}SinkConnector",
+ "${dbNameToLowerCase}host":"localhost",
+ "${dbNameToLowerCase}port":8123,
+ "database":"clickhouse",
+ "username":"default",
+ "password":"123456",
+ "connect.topicnames":"test${dbNameToCamel}Topic",
+
"value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
+
"key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
+}
+```
+
+##### parameter configuration
+
+| parameter | effect
| required | default |
+|--------------------------|---------------------------------------------------|-------------------|---------|
+| ${dbNameToLowerCase}host | The Host of the ${dbNameToCamel} server
| yes | null |
+| ${dbNameToLowerCase}port | The Port of the ${dbNameToCamel} server
| yes | null |
+| database | The database to read or write
| yes | null |
+| table | The source table to read
| yes (source only) | null |
+| topic | RocketMQ topic for source connector to write into
| yes (source only) | null |
+| connect.topicnames | RocketMQ topic for sink connector to read from
| yes (sink only) | null |
diff --git
a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/pom.xml
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/pom.xml
new file mode 100644
index 00000000..c948d03b
--- /dev/null
+++
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/pom.xml
@@ -0,0 +1,188 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more
contributor
+ license agreements. See the NOTICE file distributed with this work for
additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not
use
+ this file except in compliance with the License. You may obtain a copy
of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
required
+ by applicable law or agreed to in writing, software distributed under
the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License.
--><project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>${groupId}</groupId>
+ <artifactId>${artifactId}</artifactId>
+ <version>${version}</version>
+
+ <name>connect-${dbNameToLowerCase}</name>
+
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ </license>
+ </licenses>
+
+ <issueManagement>
+ <system>jira</system>
+ <url>https://issues.apache.org/jira/browse/RocketMQ</url>
+ </issueManagement>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>versions-maven-plugin</artifactId>
+ <version>2.3</version>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>clirr-maven-plugin</artifactId>
+ <version>2.7</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.6.1</version>
+ <configuration>
+ <source>${maven.compiler.source}</source>
+ <target>${maven.compiler.target}</target>
+ <compilerVersion>${maven.compiler.source}</compilerVersion>
+ <showDeprecation>true</showDeprecation>
+ <showWarnings>true</showWarnings>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.19.1</version>
+ <configuration>
+ <argLine>-Xms512m -Xmx1024m</argLine>
+ <forkMode>always</forkMode>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.6</version>
+ <configuration>
+ <locales>en_US</locales>
+ <outputEncoding>UTF-8</outputEncoding>
+ <inputEncoding>UTF-8</inputEncoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>3.0.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.10.4</version>
+ <configuration>
+ <charset>UTF-8</charset>
+ <locale>en_US</locale>
+
<excludePackageNames>io.openmessaging.internal</excludePackageNames>
+ </configuration>
+ <executions>
+ <execution>
+ <id>aggregate</id>
+ <goals>
+ <goal>aggregate</goal>
+ </goals>
+ <phase>site</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>3.0.2</version>
+ <configuration>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>3.0.4</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <version>0.12</version>
+ <configuration>
+ <excludes>
+ <exclude>README.md</exclude>
+ <exclude>README-CN.md</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.0.0</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-connector</artifactId>
+ <version>0.1.4</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <!-- FIXME: Write dependencies here-->
+
+ <dependency>
+ <groupId>org.lz4</groupId>
+ <artifactId>lz4-java</artifactId>
+ <version>1.8.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ <version>1.2.83</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>RELEASE</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.7</version>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git
a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__BaseConfig.java
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__BaseConfig.java
new file mode 100644
index 00000000..c13c241c
--- /dev/null
+++
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__BaseConfig.java
@@ -0,0 +1,131 @@
+#set( $symbol_pound = '#' )
+#set( $symbol_dollar = '$' )
+#set( $symbol_escape = '\' )
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package}.config;
+
+import io.openmessaging.KeyValue;
+import java.lang.reflect.Method;
+
+public class ${dbNameToCamel}BaseConfig {
+
+ private String ${databaseName}Host;
+
+ private Integer ${databaseName}Port;
+
+ private String database;
+
+ private String userName;
+
+ private String passWord;
+
+ private String topic;
+
+ public String get${dbNameToCamel}Host() {
+ return ${databaseName}Host;
+ }
+
+ public void set${dbNameToCamel}Host(String ${databaseName}Host) {
+ this.${databaseName}Host = ${databaseName}Host;
+ }
+
+ public Integer get${dbNameToCamel}Port() {
+ return ${databaseName}Port;
+ }
+
+ public void set${dbNameToCamel}Port(Integer ${databaseName}Port) {
+ this.${databaseName}Port = ${databaseName}Port;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
+ public String getPassWord() {
+ return passWord;
+ }
+
+ public void setPassWord(String passWord) {
+ this.passWord = passWord;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public void setDatabase(String database) {
+ this.database = database;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public void load(KeyValue props) {
+ properties2Object(props, this);
+ }
+
+ private void properties2Object(final KeyValue p, final Object object) {
+
+ Method[] methods = object.getClass().getMethods();
+ for (Method method : methods) {
+ String mn = method.getName();
+ if (mn.startsWith("set")) {
+ try {
+ String tmp = mn.substring(3);
+ String key = tmp.toLowerCase();
+
+ String property = p.getString(key);
+ if (property != null) {
+ Class<?>[] pt = method.getParameterTypes();
+ if (pt != null && pt.length > 0) {
+ String cn = pt[0].getSimpleName();
+ Object arg;
+ if (cn.equals("int") || cn.equals("Integer")) {
+ arg = Integer.parseInt(property);
+ } else if (cn.equals("long") || cn.equals("Long"))
{
+ arg = Long.parseLong(property);
+ } else if (cn.equals("double") ||
cn.equals("Double")) {
+ arg = Double.parseDouble(property);
+ } else if (cn.equals("boolean") ||
cn.equals("Boolean")) {
+ arg = Boolean.parseBoolean(property);
+ } else if (cn.equals("float") ||
cn.equals("Float")) {
+ arg = Float.parseFloat(property);
+ } else if (cn.equals("String")) {
+ arg = property;
+ } else {
+ continue;
+ }
+ method.invoke(object, arg);
+ }
+ }
+ } catch (Throwable ignored) {
+ }
+ }
+ }
+ }
+}
diff --git
a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__Constants.java
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__Constants.java
new file mode 100644
index 00000000..0973299d
--- /dev/null
+++
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__Constants.java
@@ -0,0 +1,50 @@
+#set( $symbol_pound = '#' )
+#set( $symbol_dollar = '$' )
+#set( $symbol_escape = '\' )
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package}.config;
+
+public class ${dbNameToCamel}Constants {
+ public static final String ${dbNameToUpperCase}_HOST =
"${dbNameToLowerCase}host";
+
+ public static final String ${dbNameToUpperCase}_PORT =
"${dbNameToLowerCase}port";
+
+ public static final String ${dbNameToUpperCase}_DATABASE = "database";
+
+ public static final String ${dbNameToUpperCase}_USERNAME = "username";
+
+ public static final String ${dbNameToUpperCase}_PASSWORD = "password";
+
+ public static final String ${dbNameToUpperCase}_TABLE = "table";
+
+ public static final String TOPIC = "topic";
+
+ public static final String ${dbNameToUpperCase}_OFFSET = "OFFSET";
+
+ public static final String ${dbNameToUpperCase}_PARTITION =
"${dbNameToUpperCase}_PARTITION";
+
+ public static final Integer defaultTimeoutSeconds = 30;
+
+ public static final int MILLI_IN_A_SEC = 1000;
+
+ public static final Integer retryCountDefault = 3;
+
+ public static final int BATCH_SIZE = 2000;
+
+}
diff --git
a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__SinkConfig.java
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__SinkConfig.java
new file mode 100644
index 00000000..e0a2d632
--- /dev/null
+++
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__SinkConfig.java
@@ -0,0 +1,34 @@
+#set( $symbol_pound = '#' )
+#set( $symbol_dollar = '$' )
+#set( $symbol_escape = '\' )
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package}.config;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class ${dbNameToCamel}SinkConfig extends ${dbNameToCamel}BaseConfig {
+ public static final Set<String> SINK_REQUEST_CONFIG = new
HashSet<String>() {
+ {
+ add(${dbNameToCamel}Constants.${dbNameToUpperCase}_HOST);
+ add(${dbNameToCamel}Constants.${dbNameToUpperCase}_PORT);
+ // FIXME: add config you need
+ }
+ };
+}
diff --git
a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__SourceConfig.java
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__SourceConfig.java
new file mode 100644
index 00000000..4f02861e
--- /dev/null
+++
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__SourceConfig.java
@@ -0,0 +1,45 @@
+#set( $symbol_pound = '#' )
+#set( $symbol_dollar = '$' )
+#set( $symbol_escape = '\' )
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package}.config;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class ${dbNameToCamel}SourceConfig extends ${dbNameToCamel}BaseConfig {
+
+ public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
+ {
+ add(${dbNameToCamel}Constants.${dbNameToUpperCase}_HOST);
+ add(${dbNameToCamel}Constants.${dbNameToUpperCase}_PORT);
+ add(${dbNameToCamel}Constants.${dbNameToUpperCase}_TABLE);
+ add(${dbNameToCamel}Constants.TOPIC);
+ }
+ };
+ private String table;
+
+ public String getTable() {
+ return table;
+ }
+
+ public void setTable(String table) {
+ this.table = table;
+ }
+}
diff --git
a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/helper/__dbNameToCamel__HelperClient.java
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/helper/__dbNameToCamel__HelperClient.java
new file mode 100644
index 00000000..38623c36
--- /dev/null
+++
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/helper/__dbNameToCamel__HelperClient.java
@@ -0,0 +1,72 @@
+#set( $symbol_pound = '#' )
+#set( $symbol_dollar = '$' )
+#set( $symbol_escape = '\' )
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package}.helper;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import ${package}.config.${dbNameToCamel}Constants;
+import ${package}.config.${dbNameToCamel}BaseConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ${dbNameToCamel}HelperClient {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(${dbNameToCamel}HelperClient.class);
+
+ private ${dbNameToCamel}BaseConfig config;
+ private int timeout = ${dbNameToCamel}Constants.defaultTimeoutSeconds *
${dbNameToCamel}Constants.MILLI_IN_A_SEC;
+ private int retry = ${dbNameToCamel}Constants.retryCountDefault;
+
+ public ${dbNameToCamel}HelperClient(${dbNameToCamel}BaseConfig config) {
+ this.config = config;
+ initConnection();
+ }
+
+ public void initConnection() {
+ // FIXME: Write your code here
+ throw new RuntimeException("Method not implemented");
+ }
+
+ public boolean ping() {
+ // FIXME: Write your code here
+ throw new RuntimeException("Method not implemented");
+ }
+
+ public List<?> query(long offset, int batchSize) {
+ // FIXME: Write your code here
+ throw new RuntimeException("Method not implemented");
+ }
+
+ public void batchInsert(List<?>) {
+ // FIXME: Write your code here
+ throw new RuntimeException("Method not implemented");
+ }
+
+ public boolean stop() {
+ // FIXME: Write your code here
+ throw new RuntimeException("Method not implemented");
+ }
+}
diff --git
a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/helper/__dbNameToCamel__Record.java
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/helper/__dbNameToCamel__Record.java
new file mode 100644
index 00000000..fa5c3106
--- /dev/null
+++
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/helper/__dbNameToCamel__Record.java
@@ -0,0 +1,25 @@
+#set( $symbol_pound = '#' )
+#set( $symbol_dollar = '$' )
+#set( $symbol_escape = '\' )
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package}.helper;
+
+public class ${dbNameToCamel}Record {
+ // FIXME: Write your code here
+}
diff --git
a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/sink/__dbNameToCamel__SinkConnector.java
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/sink/__dbNameToCamel__SinkConnector.java
new file mode 100644
index 00000000..a9b23a89
--- /dev/null
+++
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/sink/__dbNameToCamel__SinkConnector.java
@@ -0,0 +1,60 @@
+#set( $symbol_pound = '#' )
+#set( $symbol_dollar = '$' )
+#set( $symbol_escape = '\' )
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package}.sink;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import java.util.ArrayList;
+import java.util.List;
+import ${package}.config.${dbNameToCamel}SinkConfig;
+
+public class ${dbNameToCamel}SinkConnector extends SinkConnector {
+
+ private KeyValue keyValue;
+
+ @Override public List<KeyValue> taskConfigs(int maxTasks) {
+ List<KeyValue> configs = new ArrayList<>();
+ for (int i = 0; i < maxTasks; i++) {
+ configs.add(this.keyValue);
+ }
+ return configs;
+ }
+
+ @Override public Class<? extends Task> taskClass() {
+ return ${dbNameToCamel}SinkTask.class;
+ }
+
+ @Override public void start(KeyValue value) {
+
+ for (String requestKey :
${dbNameToCamel}SinkConfig.SINK_REQUEST_CONFIG) {
+ if (!value.containsKey(requestKey)) {
+ throw new RuntimeException("Request config key: " +
requestKey);
+ }
+ }
+
+ this.keyValue = value;
+ }
+
+ @Override public void stop() {
+ this.keyValue = null;
+ }
+}
diff --git
a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/sink/__dbNameToCamel__SinkTask.java
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/sink/__dbNameToCamel__SinkTask.java
new file mode 100644
index 00000000..a4ee54a2
--- /dev/null
+++
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/sink/__dbNameToCamel__SinkTask.java
@@ -0,0 +1,69 @@
+#set( $symbol_pound = '#' )
+#set( $symbol_dollar = '$' )
+#set( $symbol_escape = '\' )
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package}.sink;
+
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.connector.api.errors.ConnectException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import ${package}.helper.${dbNameToCamel}HelperClient;
+import ${package}.config.${dbNameToCamel}SinkConfig;
+
+public class ${dbNameToCamel}SinkTask extends SinkTask {
+
+ public ${dbNameToCamel}SinkConfig config;
+
+ private ${dbNameToCamel}HelperClient helperClient;
+
+ @Override public void put(List<ConnectRecord> sinkRecords) throws
ConnectException {
+ if (sinkRecords == null || sinkRecords.size() < 1) {
+ return;
+ }
+ for (ConnectRecord record : sinkRecords) {
+ String table = record.getSchema().getName();
+ final List<Field> fields = record.getSchema().getFields();
+ final Struct structData = (Struct) record.getData();
+
+ // FIXME: Write your code here
+ throw new RuntimeException("Method not implemented");
+ }
+ }
+
+ @Override public void start(KeyValue keyValue) {
+ this.config = new ${dbNameToCamel}SinkConfig();
+ this.config.load(keyValue);
+ this.helperClient = new ${dbNameToCamel}HelperClient(this.config);
+ if (!helperClient.ping()) {
+ throw new RuntimeException("Cannot connect to ${dbNameToLowerCase}
server!");
+ }
+ }
+
+ @Override public void stop() {
+ this.helperClient.stop();
+ }
+}
diff --git
a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/source/__dbNameToCamel__SourceConnector.java
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/source/__dbNameToCamel__SourceConnector.java
new file mode 100644
index 00000000..d1a5daf5
--- /dev/null
+++
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/source/__dbNameToCamel__SourceConnector.java
@@ -0,0 +1,60 @@
+#set( $symbol_pound = '#' )
+#set( $symbol_dollar = '$' )
+#set( $symbol_escape = '\' )
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package}.source;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import java.util.ArrayList;
+import java.util.List;
+import ${package}.config.${dbNameToCamel}SourceConfig;
+
+public class ${dbNameToCamel}SourceConnector extends SourceConnector {
+
+ private KeyValue keyValue;
+
+ @Override public List<KeyValue> taskConfigs(int maxTasks) {
+ List<KeyValue> configs = new ArrayList<>();
+ for (int i = 0; i < maxTasks; i++) {
+ configs.add(this.keyValue);
+ }
+ return configs;
+ }
+
+ @Override public Class<? extends Task> taskClass() {
+ return ${dbNameToCamel}SourceTask.class;
+ }
+
+ @Override public void start(KeyValue config) {
+
+ for (String requestKey : ${dbNameToCamel}SourceConfig.REQUEST_CONFIG) {
+ if (!config.containsKey(requestKey)) {
+ throw new RuntimeException("Request config key: " +
requestKey);
+ }
+ }
+ this.keyValue = config;
+
+ }
+
+ @Override public void stop() {
+ this.keyValue = null;
+ }
+}
diff --git
a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/source/__dbNameToCamel__SourceTask.java
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/source/__dbNameToCamel__SourceTask.java
new file mode 100644
index 00000000..3a0e3cb4
--- /dev/null
+++
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/source/__dbNameToCamel__SourceTask.java
@@ -0,0 +1,167 @@
+#set( $symbol_pound = '#' )
+#set( $symbol_dollar = '$' )
+#set( $symbol_escape = '\' )
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ${package}.source;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.internal.DefaultKeyValue;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import ${package}.helper.${dbNameToCamel}HelperClient;
+import ${package}.helper.${dbNameToCamel}Record;
+import ${package}.config.${dbNameToCamel}Constants;
+import ${package}.config.${dbNameToCamel}SourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ${dbNameToCamel}SourceTask extends SourceTask {
+
+ private static final Logger log =
LoggerFactory.getLogger(${dbNameToCamel}SourceTask.class);
+
+ private ${dbNameToCamel}SourceConfig config;
+
+ private ${dbNameToCamel}HelperClient helperClient;
+
+ @Override public List<ConnectRecord> poll() {
+ List<ConnectRecord> res = new ArrayList<>();
+ long offset = readRecordOffset();
+ List<${dbNameToCamel}Record> recordList = helperClient.query(offset,
${dbNameToCamel}Constants.BATCH_SIZE);
+ res = recordList.stream().map(record ->
${dbNameToLowerCase}Record2ConnectRecord(record,
offset)).collect(Collectors.toList());
+ // FIXME: Write your code here
+ throw new RuntimeException("Method not implemented");
+
+ return res;
+ }
+
+ private long readRecordOffset() {
+ final RecordOffset positionInfo =
this.sourceTaskContext.offsetStorageReader().readOffset(buildRecordPartition(config.getTable()));
+ if (positionInfo == null) {
+ return 0;
+ }
+ Object offset = positionInfo.getOffset().get(config.getTable() + "_" +
${dbNameToCamel}Constants.${dbNameToUpperCase}_OFFSET);
+ return offset == null ? 0 : Long.parseLong(offset.toString());
+ }
+
+ private ConnectRecord
${dbNameToLowerCase}Record2ConnectRecord(${dbNameToCamel}Record
${dbNameToLowerCase}Record, long offset)
+ throws NoSuchFieldException, IllegalAccessException {
+ Schema schema = SchemaBuilder.struct().name(config.getTable()).build();
+ final List<Field> fields = buildFields(${dbNameToLowerCase}Record);
+ schema.setFields(fields);
+ final ConnectRecord connectRecord = new
ConnectRecord(buildRecordPartition(config.getTable()),
+ buildRecordOffset(offset),
+ System.currentTimeMillis(),
+ schema,
+ this.buildPayLoad(fields, schema, ${dbNameToLowerCase}Record));
+
connectRecord.setExtensions(this.buildExtensions(${dbNameToLowerCase}Record));
+ return connectRecord;
+ }
+
+ private List<Field> buildFields(
+ ${dbNameToCamel}Record ${dbNameToLowerCase}Record) throws
NoSuchFieldException, IllegalAccessException {
+ List<Field> fields = new ArrayList<>();
+
+ // FIXME: Write your code here
+
+ return fields;
+ }
+
+ private RecordPartition buildRecordPartition(String partitionValue) {
+ Map<String, String> partitionMap = new HashMap<>();
+
partitionMap.put(${dbNameToCamel}Constants.${dbNameToUpperCase}_PARTITION,
partitionValue);
+ return new RecordPartition(partitionMap);
+ }
+
+ private Struct buildPayLoad(List<Field> fields, Schema schema,
${dbNameToCamel}Record ${dbNameToLowerCase}Record) {
+ Struct payLoad = new Struct(schema);
+ for (int i = 0; i < fields.size(); i++) {
+ // FIXME: Write your code here
+ }
+ return payLoad;
+ }
+
+ private KeyValue buildExtensions(${dbNameToUpperCase}Record
${dbNameToLowerCase}Record) {
+ KeyValue keyValue = new DefaultKeyValue();
+ String topicName = config.getTopic();
+ if (topicName == null || topicName.equals("")) {
+ String connectorName = this.sourceTaskContext.getConnectorName();
+ topicName = config.getTable() + "_" + connectorName;
+ }
+ keyValue.put(${dbNameToUpperCase}Constants.TOPIC, topicName);
+ return keyValue;
+ }
+
+ private RecordOffset buildRecordOffset(long offset) {
+ Map<String, Long> offsetMap = new HashMap<>();
+ offsetMap.put(config.getTable() + "_" +
${dbNameToCamel}Constants.${dbNameToUpperCase}_OFFSET, offset);
+ return new RecordOffset(offsetMap);
+ }
+
+ private static Schema getSchema(Class clazz) {
+ if (clazz.equals(Byte.class)) {
+ return SchemaBuilder.int8().build();
+ } else if (clazz.equals(Short.class) ||
clazz.equals(UnsignedByte.class)) {
+ return SchemaBuilder.int16().build();
+ } else if (clazz.equals(Integer.class) ||
clazz.equals(UnsignedShort.class)) {
+ return SchemaBuilder.int32().build();
+ } else if (clazz.equals(Long.class) ||
clazz.equals(UnsignedInteger.class)) {
+ return SchemaBuilder.int64().build();
+ } else if (clazz.equals(Float.class)) {
+ return SchemaBuilder.float32().build();
+ } else if (clazz.equals(Double.class)) {
+ return SchemaBuilder.float64().build();
+ } else if (clazz.equals(String.class)) {
+ return SchemaBuilder.string().build();
+ } else if (clazz.equals(Date.class) ||
clazz.equals(LocalDateTime.class) || clazz.equals(LocalDate.class)) {
+ return SchemaBuilder.time().build();
+ } else if (clazz.equals(Timestamp.class)) {
+ return SchemaBuilder.timestamp().build();
+ } else if (clazz.equals(Boolean.class)) {
+ return SchemaBuilder.bool().build();
+ }
+ return SchemaBuilder.string().build();
+ }
+
+ @Override public void start(KeyValue keyValue) {
+ this.config = new ${dbNameToCamel}SourceConfig();
+ this.config.load(keyValue);
+ this.helperClient = new ${dbNameToCamel}HelperClient(this.config);
+ if (!helperClient.ping()) {
+ throw new RuntimeException("Cannot connect to ${dbNameToLowerCase}
server!");
+ }
+ }
+
+ @Override public void stop() {
+ this.helperClient = null;
+ }
+}
diff --git
a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/test/java/sink/__dbNameToCamel__SinkTaskTest.java
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/test/java/sink/__dbNameToCamel__SinkTaskTest.java
new file mode 100644
index 00000000..4f6d7f59
--- /dev/null
+++
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/test/java/sink/__dbNameToCamel__SinkTaskTest.java
@@ -0,0 +1,86 @@
+#set( $symbol_pound = '#' )
+#set( $symbol_dollar = '$' )
+#set( $symbol_escape = '\' )
+package ${package}.sink;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.internal.DefaultKeyValue;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+class ${dbNameToCamel}SinkTaskTest {
+//
+// private static final String host = "127.0.0.1";
+// private static final String port = "8123";
+// private static final String db = "default";
+// private static final String username = "default";
+// private static final String password = "123456";
+//
+//
+//
+// public static void main(String[] args) {
+// List<ConnectRecord> records = new ArrayList<>();
+// // build schema
+// Schema schema = SchemaBuilder.struct()
+// .name("tableName")
+// .field("c1",SchemaBuilder.string().build())
+// .field("c2", SchemaBuilder.string().build())
+// .build();
+// // build record
+// String param0 = "1001";
+// Struct struct= new Struct(schema);
+// struct.put("c1",param0);
+// struct.put("c2",String.format("test-data-%s", param0));
+//
+// Schema schema2 = SchemaBuilder.struct()
+// .name("t1")
+// .field("c1",SchemaBuilder.string().build())
+// .field("c2", SchemaBuilder.string().build())
+// .build();
+// // build record
+// Struct struct2= new Struct(schema2);
+// struct.put("c1",param0);
+// struct.put("c2",String.format("test-data-%s", param0));
+//
+// for (int i = 0; i < 4; i++) {
+// ConnectRecord record = new ConnectRecord(
+// // offset partition
+// // offset partition"
+// new RecordPartition(new ConcurrentHashMap<>()),
+// new RecordOffset(new HashMap<>()),
+// System.currentTimeMillis(),
+// schema,
+// struct
+// );
+// records.add(record);
+//
+// ConnectRecord record2 = new ConnectRecord(
+// // offset partition
+// // offset partition"
+// new RecordPartition(new ConcurrentHashMap<>()),
+// new RecordOffset(new HashMap<>()),
+// System.currentTimeMillis(),
+// schema2,
+// struct
+// );
+// records.add(record2);
+//
+// }
+//
+// ${dbNameToCamel}SinkTask task = new ${dbNameToCamel}SinkTask();
+// KeyValue config = new DefaultKeyValue();
+// task.start(config);
+// task.put(records);
+//
+// }
+
+}
\ No newline at end of file
diff --git
a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/test/java/source/__dbNameToCamel__SourceTaskTest.java
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/test/java/source/__dbNameToCamel__SourceTaskTest.java
new file mode 100644
index 00000000..66ec034b
--- /dev/null
+++
b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/test/java/source/__dbNameToCamel__SourceTaskTest.java
@@ -0,0 +1,28 @@
+#set( $symbol_pound = '#' )
+#set( $symbol_dollar = '$' )
+#set( $symbol_escape = '\' )
+package ${package}.source;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.internal.DefaultKeyValue;
+import java.util.List;
+import junit.framework.TestCase;
+import ${package}.config.${dbNameToCamel}Constants;
+
+import static java.lang.Thread.sleep;
+
+public class ${dbNameToCamel}SourceTaskTest {
+
+// private static final String host = "127.0.0.1";
+// private static final String port = "8123";
+// private static final String db = "default";
+// private static final String username = "default";
+// private static final String password = "123456";
+//
+// public void testPoll() {
+// }
+//
+// public void testStart() throws InterruptedException {
+// }
+}
\ No newline at end of file
diff --git
a/connectors/rocketmq-connect-connectors-archetype/src/test/resources/projects/basic/archetype.properties
b/connectors/rocketmq-connect-connectors-archetype/src/test/resources/projects/basic/archetype.properties
new file mode 100644
index 00000000..0d5342b4
--- /dev/null
+++
b/connectors/rocketmq-connect-connectors-archetype/src/test/resources/projects/basic/archetype.properties
@@ -0,0 +1,11 @@
+#Mon Aug 14 22:31:14 CST 2023
+package=it.pkg
+version=0.1-SNAPSHOT
+groupId=archetype.it
+artifactId=basic
+databaseName=test
+dbNameToUpperCase=TEST
+dbNameToCamel=Test
+dbNameToLowerCase=test
+archetype.filteredExtensions=java,sql,yml,xml,properties,factories,ftl,md
+
diff --git
a/connectors/rocketmq-connect-connectors-archetype/src/test/resources/projects/basic/goal.txt
b/connectors/rocketmq-connect-connectors-archetype/src/test/resources/projects/basic/goal.txt
new file mode 100644
index 00000000..e69de29b