This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 44bb9bd16d11575f881ade0793d858c12ce0911f
Author: lizhiboo <[email protected]>
AuthorDate: Thu Sep 9 12:52:18 2021 +0800

    [ISSUE #801]Rocketmq connector sink for hudi (#800)
    
    * support rocketmq sink to hudi
    
    * support rocketmq sink to hudi debug
    
    * remove unused code
    
    * support task divide
    
    * support divide strategy by topic queue
    
    * support divide strategy by topic queue
    
    * add log4j.properties
    
    * upgrade javalin to 2.8.0
    
    * add log
    
    * add log
    
    * add log
    
    * add quick stat in READMQ.md
    
    * support start hudi sink by spark-submit
    
    * code style
    
    * code style
    
    * code style
    
    * code style
    
    * code style
    
    * code style
    
    * code style
---
 README.md                                          |  78 ++++++
 pom.xml                                            | 287 +++++++++++++++++++++
 .../rocketmq/connect/hudi/config/CloneUtils.java   |  50 ++++
 .../rocketmq/connect/hudi/config/ConfigUtil.java   |  70 +++++
 .../connect/hudi/config/HudiConnectConfig.java     | 173 +++++++++++++
 .../connect/hudi/config/SinkConnectConfig.java     | 139 ++++++++++
 .../apache/rocketmq/connect/hudi/config/Utils.java |  75 ++++++
 .../connect/hudi/connector/HudiSinkConnector.java  | 250 ++++++++++++++++++
 .../connect/hudi/connector/HudiSinkTask.java       | 111 ++++++++
 .../apache/rocketmq/connect/hudi/sink/Updater.java | 239 +++++++++++++++++
 .../connect/hudi/strategy/ITaskDivideStrategy.java |  27 ++
 .../hudi/strategy/TaskDivideByQueueStrategy.java   |  80 ++++++
 .../hudi/strategy/TaskDivideStrategyFactory.java   |  25 ++
 style/rmq_checkstyle.xml                           | 135 ++++++++++
 14 files changed, 1739 insertions(+)

diff --git a/README.md b/README.md
new file mode 100644
index 0000000..509f3b2
--- /dev/null
+++ b/README.md
@@ -0,0 +1,78 @@
+# rocketmq-connect-hudi
+
+## rocketmq-connect-hudi 打包
+```
+mvn clean install -DskipTest -U 
+```
+将target目录下打包的rocketmq-connect-hudi-0.0.1-SNAPSHOT-jar-with-dependencies.jar拷贝到connector-runtime
 connect.conf配置的connector-plugin目录下。
+## 目前安装会遇到的问题
+
+目前的rocketmq-connect-hudi 使用的是0.8.0版本的hudi.
+
+## rocketmq-connect-hudi 启动
+
+首先,需要启动connect-runtime,参考rocketmq-connect-runtime的run_work.sh脚本。
+* **hudi-sink-connector** 启动
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-hudi-sink-connector-name}
+?config='{"connector-class":"org.apache.rocketmq.connect.hudi.connector.HudiSinkConnector","topicNames":"topicc","tablePath":"file:///tmp/hudi_connector_test","tableName":"hudi_connector_test_table","insertShuffleParallelism":"2","upsertShuffleParallelism":"2","deleteParallelism":"2","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.RocketMQConverter","source-rocketmq":"127.0.0.1:9876","src-cluster":"DefaultCluster","refresh-interval":"10000","schemaPath":"/Users/o
 [...]
+```
+启动成功会打印如下日志:
+```
+2021-09-06 16:23:14 INFO pool-2-thread-1 - Open HoodieJavaWriteClient 
successfully
+```
+>**注:** `rocketmq-hudi-connect` 
的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
+
+## rocketmq-connect-hudi 停止
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-hudi-connector-name}/stop
+```
+
+## rocketmq-connect-hudi 参数说明
+* **hudi-sink-connector 参数说明**
+
+参数 | 类型 | 是否必须 | 描述 | 样例
+|---|---|---|---|---|
+|connector-class | String | 是 | sink connector类 | HudiSinkConnector|
+|tablePath | String | 是 | sink到hudi的表路径 | file:///tmp/hudi_connector_test |
+|tableName | String | 是 | sink到hudi的表名称| hudi_connector_test_table |
+|insertShuffleParallelism | int | 是 | hudi insert并发度 | 2 |
+|upsertShuffleParallelism | int | 是 | hudi upsert并发度 | 2 |
+|deleteParallelism | int | 是 | hudi delete并发度 | 2 |
+|topicNames | String | 是 | rocketmq默认每一个数据源中的表对应一个名字,该名称需和数据库表名称相同 | jdbc_hudi 
|
+|task-divide-strategy | Integer | 否 | task 分配策略, 默认值为 
0,表示按照topic分配任务,每一个table便是一个topic | 0 |
+|task-parallelism | Integer | 否 | task parallelism,默认值为 1,表示将topic拆分为多少个任务进行执行 
| 2 |
+|source-cluster | String | 是 | sink 端获取路由信息连接到的RocketMQ nameserver 地址 | 
172.17.0.1:10911 |
+|source-rocketmq | String | 是 | sink 端获取路由信息连接到的RocketMQ broker cluster 地址 | 
127.0.0.1:9876 |
+|source-record-converter | String | 是 | source data 解析 | 
org.apache.rocketmq.connect.runtime.converter.RocketMQConverter |
+|src-cluster | String | 否 | 源集群 | DefaultCluster |
+|refresh-interval | String | 否 | sink的刷新时间,单位ms | 10000 |
+|schemaPath | String | 是 | sink的schema地址 | /Users/osgoo/Downloads/user.avsc" |
+
+
+示例配置如下
+```js
+{
+       "connector-class": 
"org.apache.rocketmq.connect.hudi.connector.HudiSinkConnector",
+       "topicNames": "topicc",
+       "tablePath": "file:///tmp/hudi_connector_test",
+       "tableName": "hudi_connector_test_table",
+       "insertShuffleParallelism": "2",
+       "upsertShuffleParallelism": "2",
+       "deleteParallelism": "2",
+       "source-record-converter": 
"org.apache.rocketmq.connect.runtime.converter.RocketMQConverter",
+       "source-rocketmq": "127.0.0.1:9876",
+       "src-cluster": "DefaultCluster",
+       "refresh-interval": "10000",
+       "schemaPath": "/Users/osgoo/Downloads/user.avsc"
+}
+```
+
+* **spark-submit 启动任务**
+将connect-runtime打包后通过spark-submit提交任务
+```
+nohup sh spark-submit  --class 
org.apache.rocketmq.connect.runtime.ConnectStartup --conf 
"spark.driver.extraJavaOptions=-Dlogback.configurationFile=logback.xml" --files 
/xxx/conf/connect.conf,/xxx/conf/log4j.properties  --packages 
org.apache.hudi:hudi-spark3-bundle_2.12:0.8.0,org.apache.spark:spark-avro_2.12:3.0.1,org.apache.hudi:hudi-java-client:0.8.0,org.apache.parquet:parquet-avro:1.10.1,org.apache.avro:avro:1.10.2,com.alibaba:fastjson:1.2.51,org.reflections:reflections:0.9.11,org.apa
 [...]
+```
+后续操作参考rocketmq-connect-hudi启动步骤
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..97c8785
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,287 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>rocketmq-connect-hudi</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+        <!-- Compiler settings properties -->
+        <maven.compiler.source>1.8</maven.compiler.source>
+        <maven.compiler.target>1.8</maven.compiler.target>
+        <rocketmq.version>4.5.2</rocketmq.version>
+
+        <hudi.version>0.8.0</hudi.version>
+        <avro.version>1.10.2</avro.version>
+        <parquet.version>1.10.1</parquet.version>
+    </properties>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>versions-maven-plugin</artifactId>
+                <version>2.3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.7</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <configuration>
+                    
<outputDirectory>${project.build.directory}/lib</outputDirectory>
+                    <excludeTransitive>false</excludeTransitive>
+                    <stripVersion>true</stripVersion>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.6.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.19.1</version>
+                <configuration>
+                    <argLine>-Xms512m -Xmx1024m</argLine>
+                    <forkMode>always</forkMode>
+                    <includes>
+                        <include>**/*Test.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-site-plugin</artifactId>
+                <version>3.6</version>
+                <configuration>
+                    <locales>en_US</locales>
+                    <outputEncoding>UTF-8</outputEncoding>
+                    <inputEncoding>UTF-8</inputEncoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <version>0.12</version>
+                <configuration>
+                    <excludes>
+                        <exclude>README.md</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                    <locale>en_US</locale>
+                    
<excludePackageNames>io.openmessaging.internal</excludePackageNames>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>aggregate</id>
+                        <goals>
+                            <goal>aggregate</goal>
+                        </goals>
+                        <phase>site</phase>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>3.0.2</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.0.0</version>
+                <configuration>
+                    <archive>
+                        <!-- The Main Class Here doesn't make a lot sense 
since it was dynamically loaded-->
+                        <manifest>
+                            
<mainClass>org.apache.rocketmq.connect.hudi.connector.HudiSinkConnector</mainClass>
+                        </manifest>
+                    </archive>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <version>2.17</version>
+                <executions>
+                    <execution>
+                        <id>verify</id>
+                        <phase>verify</phase>
+                        <configuration>
+                            
<configLocation>style/rmq_checkstyle.xml</configLocation>
+                            <encoding>UTF-8</encoding>
+                            <consoleOutput>true</consoleOutput>
+                            <failsOnError>true</failsOnError>
+                            
<includeTestSourceDirectory>false</includeTestSourceDirectory>
+                            <includeTestResources>false</includeTestResources>
+                        </configuration>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+    <dependencies>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>2.6.0</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>2.6.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-codec</groupId>
+            <artifactId>commons-codec</artifactId>
+            <version>1.12</version>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <version>0.1.1</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-api</artifactId>
+            <version>0.3.1-alpha</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.7</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-tools</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-remoting</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-openmessaging</artifactId>
+            <version>4.3.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-java-client</artifactId>
+            <version>${hudi.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.parquet</groupId>
+            <artifactId>parquet-avro</artifactId>
+            <version>${parquet.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+            <version>${avro.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>1.2.51</version>
+        </dependency>
+
+        <!-- used for spark-submit -->
+        <dependency>
+            <groupId>org.pentaho</groupId>
+            <artifactId>pentaho-aggdesigner-algorithm</artifactId>
+            <version>5.1.5-jhyde</version>
+        </dependency>
+        <dependency>
+            <groupId>asm</groupId>
+            <artifactId>asm</artifactId>
+            <version>3.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-exec</artifactId>
+            <version>2.3.7</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>failureaccess</artifactId>
+            <version>1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>3.3.1</version>
+        </dependency>
+
+    </dependencies>
+</project>
\ No newline at end of file
diff --git 
a/src/main/java/org/apache/rocketmq/connect/hudi/config/CloneUtils.java 
b/src/main/java/org/apache/rocketmq/connect/hudi/config/CloneUtils.java
new file mode 100644
index 0000000..dc3605d
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/hudi/config/CloneUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.hudi.config;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.io.ObjectInputStream;
+
+public class CloneUtils {
+    private static final Logger log = 
LoggerFactory.getLogger(CloneUtils.class);
+
+    @SuppressWarnings("unchecked")
+    public static <T extends Serializable> T clone(T obj) {
+        T clonedObj = null;
+        try {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ObjectOutputStream oos = new ObjectOutputStream(baos);
+            oos.writeObject(obj);
+            oos.close();
+
+            ByteArrayInputStream bais = new 
ByteArrayInputStream(baos.toByteArray());
+            ObjectInputStream ois = new ObjectInputStream(bais);
+            clonedObj = (T) ois.readObject();
+            ois.close();
+        } catch (Exception e) {
+            log.error("Clone occur exception", e);
+        }
+        return clonedObj;
+    }
+}
diff --git 
a/src/main/java/org/apache/rocketmq/connect/hudi/config/ConfigUtil.java 
b/src/main/java/org/apache/rocketmq/connect/hudi/config/ConfigUtil.java
new file mode 100644
index 0000000..88f8a8e
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/hudi/config/ConfigUtil.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hudi.config;
+
+import io.openmessaging.KeyValue;
+
+import java.lang.reflect.Method;
+
+public class ConfigUtil {
+    public static <T> void load(KeyValue props, Object object) {
+
+        properties2Object(props, object);
+    }
+
+    private static <T> void properties2Object(final KeyValue p, final Object 
object) {
+
+        Method[] methods = object.getClass().getMethods();
+        for (Method method : methods) {
+            String mn = method.getName();
+            if (mn.startsWith("set")) {
+                try {
+                    String tmp = mn.substring(4);
+                    String first = mn.substring(3, 4);
+
+                    String key = first.toLowerCase() + tmp;
+                    String property = p.getString(key);
+                    if (property != null) {
+                        Class<?>[] pt = method.getParameterTypes();
+                        if (pt != null && pt.length > 0) {
+                            String cn = pt[0].getSimpleName();
+                            Object arg;
+                            if (cn.equals("int") || cn.equals("Integer")) {
+                                arg = Integer.parseInt(property);
+                            } else if (cn.equals("long") || cn.equals("Long")) 
{
+                                arg = Long.parseLong(property);
+                            } else if (cn.equals("double") || 
cn.equals("Double")) {
+                                arg = Double.parseDouble(property);
+                            } else if (cn.equals("boolean") || 
cn.equals("Boolean")) {
+                                arg = Boolean.parseBoolean(property);
+                            } else if (cn.equals("float") || 
cn.equals("Float")) {
+                                arg = Float.parseFloat(property);
+                            } else if (cn.equals("String")) {
+                                arg = property;
+                            } else {
+                                continue;
+                            }
+                            method.invoke(object, arg);
+                        }
+                    }
+                } catch (Throwable ignored) {
+                }
+            }
+        }
+    }
+}
diff --git 
a/src/main/java/org/apache/rocketmq/connect/hudi/config/HudiConnectConfig.java 
b/src/main/java/org/apache/rocketmq/connect/hudi/config/HudiConnectConfig.java
new file mode 100644
index 0000000..4c04605
--- /dev/null
+++ 
b/src/main/java/org/apache/rocketmq/connect/hudi/config/HudiConnectConfig.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hudi.config;
+
+
+import org.apache.avro.Schema;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+public class HudiConnectConfig {
+
+    protected String tableType = "COPY_ON_WRITE";
+
+    protected String tablePath;
+
+    protected String tableName;
+
+    protected int insertShuffleParallelism = 2;
+
+    protected int upsertShuffleParallelism = 2;
+
+    protected int deleteParallelism = 2;
+
+    protected String srcRecordConverter;
+
+    protected String topicNames;
+
+    protected String indexType = "INMEMORY";
+
+    protected String schemaPath;
+
+    public Schema schema;
+
+    public static final String CONN_TASK_PARALLELISM = "task-parallelism";
+    public static final String CONN_TASK_DIVIDE_STRATEGY = 
"task-divide-strategy";
+    public static final String CONN_WHITE_LIST = "whiteDataBase";
+    public static final String CONN_SOURCE_RECORD_CONVERTER = 
"source-record-converter";
+
+    public static final String CONN_HUDI_TABLE_TYPE = "tableType";
+    public static final String CONN_HUDI_TABLE_PATH = "tablePath";
+    public static final String CONN_HUDI_TABLE_NAME = "tableName";
+    public static final String CONN_HUDI_INSERT_SHUFFLE_PARALLELISM = 
"insertShuffleParallelism";
+    public static final String CONN_HUDI_UPSERT_SHUFFLE_PARALLELISM = 
"upsertShuffleParallelism";
+    public static final String CONN_HUDI_DELETE_PARALLELISM = 
"deleteParallelism";
+
+    public static final String CONN_TOPIC_NAMES = "topicNames";
+    public static final String CONN_TOPIC_QUEUES = "topicQueues";
+    public static final String CONN_SCHEMA_PATH = "schemaPath";
+
+    public static final String CONN_TOPIC_ROUTE_INFO = "topicRouterInfo";
+
+    public static final String CONN_SOURCE_RMQ = "source-rocketmq";
+    public static final String CONN_SOURCE_CLUSTER = "source-cluster";
+    public static final String REFRESH_INTERVAL = "refresh.interval";
+
+    public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
+        {
+            add(CONN_HUDI_TABLE_PATH);
+            add(CONN_HUDI_TABLE_NAME);
+            add(CONN_HUDI_INSERT_SHUFFLE_PARALLELISM);
+            add(CONN_HUDI_UPSERT_SHUFFLE_PARALLELISM);
+            add(CONN_HUDI_DELETE_PARALLELISM);
+            add(CONN_SOURCE_RECORD_CONVERTER);
+            add(CONN_TOPIC_NAMES);
+            add(CONN_SCHEMA_PATH);
+        }
+    };
+
+    public String getTableType() {
+        return tableType;
+    }
+
+    public void setTableType(String tableType) {
+        this.tableType = tableType;
+    }
+
+    public String getTablePath() {
+        return tablePath;
+    }
+
+    public void setTablePath(String tablePath) {
+        this.tablePath = tablePath;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public void setTableName(String tableName) {
+        this.tableName = tableName;
+    }
+
+    public int getInsertShuffleParallelism() {
+        return insertShuffleParallelism;
+    }
+
+    public void setInsertShuffleParallelism(int insertShuffleParallelism) {
+        this.insertShuffleParallelism = insertShuffleParallelism;
+    }
+
+    public int getUpsertShuffleParallelism() {
+        return upsertShuffleParallelism;
+    }
+
+    public void setUpsertShuffleParallelism(int upsertShuffleParallelism) {
+        this.upsertShuffleParallelism = upsertShuffleParallelism;
+    }
+
+    public int getDeleteParallelism() {
+        return deleteParallelism;
+    }
+
+    public void setDeleteParallelism(int deleteParallelism) {
+        this.deleteParallelism = deleteParallelism;
+    }
+
+    public String getSrcRecordConverter() {
+        return srcRecordConverter;
+    }
+
+    public void setSrcRecordConverter(String srcRecordConverter) {
+        this.srcRecordConverter = srcRecordConverter;
+    }
+
+    public String getTopicNames() {
+        return topicNames;
+    }
+
+    public void setTopicNames(String topicNames) {
+        this.topicNames = topicNames;
+    }
+
+    public String getIndexType() {
+        return indexType;
+    }
+
+    public void setIndexType(String indexType) {
+        this.indexType = indexType;
+    }
+
+    public String getSchemaPath() {
+        return schemaPath;
+    }
+
+    public void setSchemaPath(String schemaPath) {
+        this.schemaPath = schemaPath;
+    }
+
+    public Schema getSchema() {
+        return schema;
+    }
+
+    public void setSchema(Schema schema) {
+        this.schema = schema;
+    }
+}
\ No newline at end of file
diff --git 
a/src/main/java/org/apache/rocketmq/connect/hudi/config/SinkConnectConfig.java 
b/src/main/java/org/apache/rocketmq/connect/hudi/config/SinkConnectConfig.java
new file mode 100644
index 0000000..943df40
--- /dev/null
+++ 
b/src/main/java/org/apache/rocketmq/connect/hudi/config/SinkConnectConfig.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hudi.config;
+
+import io.openmessaging.KeyValue;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+
+public class SinkConnectConfig extends HudiConnectConfig {
+    private Set<String> whiteList;
+    private String srcNamesrvs;
+    private String srcCluster;
+    private long refreshInterval;
+    private Map<String, Set<MessageQueue>> topicRouteMap;
+    public int taskParallelism;
+    private String taskDivideStrategy;
+
+    public SinkConnectConfig(){
+    }
+
+    public void validate(KeyValue config) {
+        buildWhiteList(config);
+        this.tablePath = 
config.getString(HudiConnectConfig.CONN_HUDI_TABLE_PATH);
+        this.tableName = 
config.getString(HudiConnectConfig.CONN_HUDI_TABLE_NAME);
+        this.insertShuffleParallelism = 
config.getInt(HudiConnectConfig.CONN_HUDI_INSERT_SHUFFLE_PARALLELISM);
+        this.deleteParallelism = 
config.getInt(HudiConnectConfig.CONN_HUDI_DELETE_PARALLELISM);
+        this.upsertShuffleParallelism = 
config.getInt(HudiConnectConfig.CONN_HUDI_UPSERT_SHUFFLE_PARALLELISM);
+        
this.setSrcRecordConverter(config.getString(HudiConnectConfig.CONN_SOURCE_RECORD_CONVERTER));
+        
this.setTopicNames(config.getString(HudiConnectConfig.CONN_TOPIC_NAMES));
+        
this.setSchemaPath(config.getString(HudiConnectConfig.CONN_SCHEMA_PATH));
+
+        this.srcNamesrvs = config.getString(HudiConnectConfig.CONN_SOURCE_RMQ);
+        this.srcCluster = 
config.getString(HudiConnectConfig.CONN_SOURCE_CLUSTER);
+        this.refreshInterval = 
config.getLong(HudiConnectConfig.REFRESH_INTERVAL, 3);
+
+    }
+
+    private void buildWhiteList(KeyValue config) {
+        this.whiteList = new HashSet<>();
+        String whiteListStr = 
config.getString(HudiConnectConfig.CONN_TOPIC_NAMES, "");
+        String[] wl = whiteListStr.trim().split(",");
+        if (wl.length <= 0)
+            throw new IllegalArgumentException("White list must be not 
empty.");
+        else {
+            this.whiteList.clear();
+            for (String t : wl) {
+                this.whiteList.add(t.trim());
+            }
+        }
+    }
+
+
+    public Set<String> getWhiteList() {
+        return whiteList;
+    }
+
+    public void setWhiteList(Set<String> whiteList) {
+        this.whiteList = whiteList;
+    }
+
+    public String getSrcNamesrvs() {
+        return this.srcNamesrvs;
+    }
+
+    public String getSrcCluster() {
+        return this.srcCluster;
+    }
+
+    public long getRefreshInterval() {
+        return this.refreshInterval;
+    }
+
+    public Map<String, Set<MessageQueue>> getTopicRouteMap() {
+        return topicRouteMap;
+    }
+
+    public void setTopicRouteMap(Map<String, Set<MessageQueue>> topicRouteMap) 
{
+        this.topicRouteMap = topicRouteMap;
+    }
+
+    public Set<String> getWhiteTopics() {
+        return getWhiteList();
+    }
+
+    public int getTaskParallelism() {
+        return taskParallelism;
+    }
+
+    public void setTaskParallelism(int taskParallelism) {
+        this.taskParallelism = taskParallelism;
+    }
+
+    public String getTaskDivideStrategy() {
+        return taskDivideStrategy;
+    }
+
+    public void setTaskDivideStrategy(String taskDivideStrategy) {
+        this.taskDivideStrategy = taskDivideStrategy;
+    }
+
+    @Override
+    public String toString() {
+        return "SinkConnectConfig{" +
+                "whiteList=" + whiteList +
+                ", srcNamesrvs='" + srcNamesrvs + '\'' +
+                ", srcCluster='" + srcCluster + '\'' +
+                ", refreshInterval=" + refreshInterval +
+                ", topicRouteMap=" + topicRouteMap +
+                ", tableType='" + tableType + '\'' +
+                ", tablePath='" + tablePath + '\'' +
+                ", tableName='" + tableName + '\'' +
+                ", insertShuffleParallelism=" + insertShuffleParallelism +
+                ", upsertShuffleParallelism=" + upsertShuffleParallelism +
+                ", deleteParallelism=" + deleteParallelism +
+                ", indexType='" + indexType + '\'' +
+                ", schemaPath='" + schemaPath + '\'' +
+                ", schema=" + schema +
+                '}';
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/hudi/config/Utils.java 
b/src/main/java/org/apache/rocketmq/connect/hudi/config/Utils.java
new file mode 100644
index 0000000..d9bc6fe
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/hudi/config/Utils.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hudi.config;
+
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class Utils {
+    private static final Logger log = LoggerFactory.getLogger(Utils.class);
+
+    public static String createGroupName(String prefix) {
+        return new 
StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).toString();
+    }
+
+    public static String createGroupName(String prefix, String postfix) {
+        return new 
StringBuilder().append(prefix).append("-").append(postfix).toString();
+    }
+
+    public static String createTaskId(String prefix) {
+        return new 
StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).toString();
+    }
+
+    public static String createInstanceName(String namesrvAddr) {
+        String[] namesrvArray = namesrvAddr.split(";");
+        List<String> namesrvList = new ArrayList<>();
+        for (String ns : namesrvArray) {
+            if (!namesrvList.contains(ns)) {
+                namesrvList.add(ns);
+            }
+        }
+        Collections.sort(namesrvList);
+        return String.valueOf(namesrvList.toString().hashCode());
+    }
+
+    public static List<BrokerData> examineBrokerData(DefaultMQAdminExt 
defaultMQAdminExt, String topic,
+        String cluster) throws RemotingException, MQClientException, 
InterruptedException {
+        List<BrokerData> brokerList = new ArrayList<>();
+
+        TopicRouteData topicRouteData = 
defaultMQAdminExt.examineTopicRouteInfo(topic);
+        if (topicRouteData.getBrokerDatas() != null) {
+            for (BrokerData broker : topicRouteData.getBrokerDatas()) {
+                if (StringUtils.equals(broker.getCluster(), cluster)) {
+                    brokerList.add(broker);
+                }
+            }
+        }
+        return brokerList;
+    }
+}
diff --git 
a/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkConnector.java
 
b/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkConnector.java
new file mode 100644
index 0000000..a496418
--- /dev/null
+++ 
b/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkConnector.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hudi.connector;
+
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.connector.api.sink.SinkConnector;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.connect.hudi.config.HudiConnectConfig;
+import org.apache.rocketmq.connect.hudi.config.SinkConnectConfig;
+import org.apache.rocketmq.connect.hudi.config.CloneUtils;
+import org.apache.rocketmq.connect.hudi.config.Utils;
+import org.apache.rocketmq.connect.hudi.strategy.ITaskDivideStrategy;
+import org.apache.rocketmq.connect.hudi.strategy.TaskDivideStrategyFactory;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+
+public class HudiSinkConnector extends SinkConnector {
+    private static final Logger log = 
LoggerFactory.getLogger(HudiSinkConnector.class);
+    private volatile boolean configValid = false;
+    private ScheduledExecutorService executor;
+    private HashMap<String, Set<MessageQueue>> topicRouteMap;
+
+    private DefaultMQAdminExt srcMQAdminExt;
+    private SinkConnectConfig sinkConnectConfig;
+
+    private volatile boolean adminStarted;
+
+    private ScheduledFuture<?> listenerHandle;
+    public static final String HUDI_CONNECTOR_ADMIN_PREFIX = 
"HUDI-CONNECTOR-ADMIN";
+    public static final String PREFIX = "hudi";
+
+    public HudiSinkConnector() {
+        topicRouteMap = new HashMap<>();
+        sinkConnectConfig = new SinkConnectConfig();
+        executor = Executors.newSingleThreadScheduledExecutor(new 
BasicThreadFactory.Builder().namingPattern("HudiFSinkConnector-SinkWatcher-%d").daemon(true).build());
+    }
+
+    private synchronized void startMQAdminTools() {
+        if (!configValid || adminStarted) {
+            return;
+        }
+        RPCHook rpcHook = null;
+        this.srcMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        
this.srcMQAdminExt.setNamesrvAddr(this.sinkConnectConfig.getSrcNamesrvs());
+        
this.srcMQAdminExt.setAdminExtGroup(Utils.createGroupName(HUDI_CONNECTOR_ADMIN_PREFIX));
+        
this.srcMQAdminExt.setInstanceName(Utils.createInstanceName(this.sinkConnectConfig.getSrcNamesrvs()));
+
+        try {
+            log.info("Trying to start srcMQAdminExt");
+            this.srcMQAdminExt.start();
+            log.info("RocketMQ srcMQAdminExt started");
+
+        } catch (MQClientException e) {
+            log.error("Hudi Sink Task start failed for `srcMQAdminExt` 
exception.", e);
+        }
+
+        adminStarted = true;
+    }
+
+    @Override
+    public String verifyAndSetConfig(KeyValue config) {
+        for (String requestKey : HudiConnectConfig.REQUEST_CONFIG) {
+            if (!config.containsKey(requestKey)) {
+                return "Request config key: " + requestKey;
+            }
+        }
+        try {
+            this.sinkConnectConfig.validate(config);
+        } catch (IllegalArgumentException e) {
+            return e.getMessage();
+        }
+        this.configValid = true;
+
+        return "";
+    }
+
+    @Override
+    public void start() {
+        startMQAdminTools();
+        startListener();
+    }
+
+    public void startListener() {
+        listenerHandle = executor.scheduleAtFixedRate(new Runnable() {
+            boolean first = true;
+            HashMap<String, Set<MessageQueue>> origin = null;
+
+            @Override
+            public void run() {
+                buildRoute();
+                if (first) {
+                    origin = CloneUtils.clone(topicRouteMap);
+                    first = false;
+                }
+                if (!compare(origin, topicRouteMap)) {
+                    context.requestTaskReconfiguration();
+                    origin = CloneUtils.clone(topicRouteMap);
+                }
+            }
+        }, sinkConnectConfig.getRefreshInterval(), 
sinkConnectConfig.getRefreshInterval(), TimeUnit.SECONDS);
+    }
+
+    public boolean compare(Map<String, Set<MessageQueue>> origin, Map<String, 
Set<MessageQueue>> updated) {
+        if (origin.size() != updated.size()) {
+            return false;
+        }
+        for (Map.Entry<String, Set<MessageQueue>> entry : origin.entrySet()) {
+            if (!updated.containsKey(entry.getKey())) {
+                return false;
+            }
+            Set<MessageQueue> originTasks = entry.getValue();
+            Set<MessageQueue> updateTasks = updated.get(entry.getKey());
+            if (originTasks.size() != updateTasks.size()) {
+                return false;
+            }
+
+            if (!originTasks.containsAll(updateTasks)) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    public void buildRoute() {
+        String srcCluster = this.sinkConnectConfig.getSrcCluster();
+        try {
+            for (String topic : this.sinkConnectConfig.getWhiteList()) {
+
+                // different from BrokerData with cluster field, which can 
ensure the brokerData is from expected cluster.
+                // QueueData use brokerName as unique info on cluster of 
rocketmq. so when we want to get QueueData of
+                // expected cluster, we should get brokerNames of expected 
cluster, and then filter queueDatas.
+                List<BrokerData> brokerList = 
Utils.examineBrokerData(this.srcMQAdminExt, topic, srcCluster);
+                Set<String> brokerNameSet = new HashSet<String>();
+                for (BrokerData b : brokerList) {
+                    brokerNameSet.add(b.getBrokerName());
+                }
+
+                TopicRouteData topicRouteData = 
srcMQAdminExt.examineTopicRouteInfo(topic);
+                if (!topicRouteMap.containsKey(topic)) {
+                    topicRouteMap.put(topic, new HashSet<>(16));
+                }
+                for (QueueData qd : topicRouteData.getQueueDatas()) {
+                    if (brokerNameSet.contains(qd.getBrokerName())) {
+                        for (int i = 0; i < qd.getReadQueueNums(); i++) {
+                            MessageQueue taskTopicInfo = new 
MessageQueue(topic, qd.getBrokerName(), i);
+                            topicRouteMap.get(topic).add(taskTopicInfo);
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("Fetch topic list error.", e);
+        } finally {
+            srcMQAdminExt.shutdown();
+        }
+    }
+
+
+    /**
+     * We need to reason why we don't call srcMQAdminExt.shutdown() here, and 
why
+     * it can be applied to srcMQAdminExt
+     */
+    @Override
+    public void stop() {
+        listenerHandle.cancel(true);
+        srcMQAdminExt.shutdown();
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return HudiSinkTask.class;
+    }
+
+    @Override
+    public List<KeyValue> taskConfigs() {
+        log.info("List.start");
+        if (!configValid) {
+            return new ArrayList<KeyValue>();
+        }
+        startMQAdminTools();
+        buildRoute();
+        DefaultKeyValue defaultKeyValue = new DefaultKeyValue();
+        defaultKeyValue.put(HudiConnectConfig.CONN_HUDI_TABLE_PATH, 
sinkConnectConfig.getTablePath());
+        defaultKeyValue.put(HudiConnectConfig.CONN_HUDI_TABLE_NAME, 
sinkConnectConfig.getTableName());
+        
defaultKeyValue.put(HudiConnectConfig.CONN_HUDI_INSERT_SHUFFLE_PARALLELISM, 
sinkConnectConfig.getInsertShuffleParallelism());
+        
defaultKeyValue.put(HudiConnectConfig.CONN_HUDI_UPSERT_SHUFFLE_PARALLELISM, 
sinkConnectConfig.getUpsertShuffleParallelism());
+        defaultKeyValue.put(HudiConnectConfig.CONN_HUDI_DELETE_PARALLELISM, 
sinkConnectConfig.getDeleteParallelism());
+        defaultKeyValue.put(HudiConnectConfig.CONN_SOURCE_RECORD_CONVERTER, 
sinkConnectConfig.getSrcRecordConverter());
+        defaultKeyValue.put(HudiConnectConfig.CONN_TOPIC_NAMES, 
sinkConnectConfig.getTopicNames());
+        defaultKeyValue.put(HudiConnectConfig.CONN_SCHEMA_PATH, 
sinkConnectConfig.getSchemaPath());
+        defaultKeyValue.put(HudiConnectConfig.CONN_TASK_PARALLELISM, 
sinkConnectConfig.getTaskParallelism());
+        defaultKeyValue.put(HudiConnectConfig.CONN_TASK_DIVIDE_STRATEGY, 
sinkConnectConfig.getTaskDivideStrategy());
+        defaultKeyValue.put(HudiConnectConfig.CONN_WHITE_LIST, 
JSONObject.toJSONString(sinkConnectConfig.getWhiteList()));
+        defaultKeyValue.put(HudiConnectConfig.CONN_SCHEMA_PATH, 
sinkConnectConfig.getSchemaPath());
+        defaultKeyValue.put(HudiConnectConfig.CONN_TOPIC_ROUTE_INFO, 
JSONObject.toJSONString(sinkConnectConfig.getTopicRouteMap()));
+        log.info("taskConfig : " + defaultKeyValue + ", sinkConnectConfig : " 
+ sinkConnectConfig);
+        ITaskDivideStrategy strategy = TaskDivideStrategyFactory.getInstance();
+        List<KeyValue> taskConfigs = strategy.divide(defaultKeyValue);
+        return taskConfigs;
+    }
+}
diff --git 
a/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkTask.java 
b/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkTask.java
new file mode 100644
index 0000000..b01c660
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkTask.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hudi.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.common.QueueMetaData;
+import io.openmessaging.connector.api.data.SinkDataEntry;
+import io.openmessaging.connector.api.sink.SinkTask;
+import org.apache.rocketmq.connect.hudi.config.HudiConnectConfig;
+import org.apache.rocketmq.connect.hudi.config.ConfigUtil;
+import org.apache.rocketmq.connect.hudi.sink.Updater;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Map;
+
+
+/**
+ * In the naming, we are using database for "keyspaces" and table for 
"columnFamily"
+ * This is because we kind of want the abstract data source to be aligned with 
SQL databases
+ */
+public class HudiSinkTask extends SinkTask {
+    private static final Logger log = 
LoggerFactory.getLogger(HudiSinkTask.class);
+
+    private HudiConnectConfig hudiConnectConfig;
+    private Updater updater;
+
+    public HudiSinkTask() {
+        this.hudiConnectConfig = new HudiConnectConfig();
+    }
+
+    @Override
+    public void put(Collection<SinkDataEntry> sinkDataEntries) {
+        try {
+            log.info("Hudi Sink Task trying to put()");
+            for (SinkDataEntry record : sinkDataEntries) {
+                log.info("Hudi Sink Task trying to call updater.push()");
+                Boolean isSuccess = updater.push(record);
+                if (!isSuccess) {
+                    log.error("Hudi sink push data error, record:{}", record);
+                }
+                log.debug("Hudi pushed data : " + record);
+            }
+        } catch (Exception e) {
+            log.error("put sinkDataEntries error, {}", e);
+        }
+    }
+
+    @Override
+    public void commit(Map<QueueMetaData, Long> map) {
+
+    }
+
+    /**
+     * Remember always close the CqlSession according to
+     * https://docs.datastax.com/en/developer/java-driver/4.5/manual/core/
+     * @param props
+     */
+    @Override
+    public void start(KeyValue props) {
+        try {
+            ConfigUtil.load(props, this.hudiConnectConfig);
+            log.info("init data source success");
+        } catch (Exception e) {
+            log.error("Cannot start Hudi Sink Task because of configuration 
error{}", e);
+        }
+        try {
+            updater = new Updater(hudiConnectConfig);
+            updater.start();
+        } catch (Throwable e) {
+            log.error("fail to start updater{}", e);
+        }
+
+    }
+
+    @Override
+    public void stop() {
+        try {
+            updater.stop();
+            log.info("hudi sink task connection is closed.");
+        } catch (Throwable e) {
+            log.warn("sink task stop error while closing connection to {}", 
"hudi", e);
+        }
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/hudi/sink/Updater.java 
b/src/main/java/org/apache/rocketmq/connect/hudi/sink/Updater.java
new file mode 100644
index 0000000..8e7e288
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/hudi/sink/Updater.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.rocketmq.connect.hudi.sink;
+
+
+import io.openmessaging.connector.api.data.SinkDataEntry;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hudi.client.HoodieJavaWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.avro.GenericDataSupplier;
+import org.apache.rocketmq.connect.hudi.config.HudiConnectConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class Updater {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private HudiConnectConfig hudiConnectConfig;
+    private HoodieJavaWriteClient hudiWriteClient;
+    private HoodieWriteConfig cfg;
+    private transient ScheduledExecutorService scheduledExecutor;
+    private int flushIntervalMs = 3000;
+    private int batchSize = 100;
+    private List<SinkDataEntry> inflightList;
+    private Object batchLocker = new Object();
+
+
+    public Updater(HudiConnectConfig hudiConnectConfig) throws Exception {
+        this.hudiConnectConfig = hudiConnectConfig;
+
+        try {
+            File schemaFile = new File(hudiConnectConfig.getSchemaPath());
+            this.hudiConnectConfig.schema = new 
Schema.Parser().parse(schemaFile);
+            log.info("Hudi schema : " + 
this.hudiConnectConfig.schema.toString());
+        } catch (IOException e) {
+            throw new Exception(String.format("Failed to find schema file %s", 
hudiConnectConfig.getSchemaPath()), e);
+        }
+        Configuration hadoopConf = new Configuration();
+        hadoopConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
+        hadoopConf.set(AvroReadSupport.AVRO_DATA_SUPPLIER, 
GenericDataSupplier.class.getName());
+        hadoopConf.setClassLoader(this.getClass().getClassLoader());
+        hadoopConf.set("fs.hdfs.impl",
+                DistributedFileSystem.class.getName()
+        );
+        hadoopConf.set("fs.file.impl",
+                LocalFileSystem.class.getName()
+        );
+
+        // fs.%s.impl.disable.cache
+        hadoopConf.set("fs.file.impl.disable.cache", String.valueOf(true));
+        
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+
+        Path path = new Path(hudiConnectConfig.getTablePath());
+        FileSystem fs = FSUtils.getFs(hudiConnectConfig.getTablePath(), 
hadoopConf);
+        if (!fs.exists(path)) {
+            HoodieTableMetaClient.withPropertyBuilder()
+                    .setTableType(hudiConnectConfig.getTableType())
+                    .setTableName(hudiConnectConfig.getTableName())
+                    .setPayloadClassName(HoodieAvroPayload.class.getName())
+                    .initTable(hadoopConf, hudiConnectConfig.getTablePath());
+        }
+        log.info("Hudi inited table");
+
+        this.cfg = 
HoodieWriteConfig.newBuilder().withPath(hudiConnectConfig.getTablePath())
+                .withSchema(this.hudiConnectConfig.schema.toString())
+                .withEngineType(EngineType.JAVA)
+                
.withParallelism(hudiConnectConfig.getInsertShuffleParallelism(), 
hudiConnectConfig.getUpsertShuffleParallelism())
+                
.withDeleteParallelism(hudiConnectConfig.getDeleteParallelism()).forTable(hudiConnectConfig.getTableName())
+                
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
+                
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(20,
 30).build()).build();
+        cfg.getAvroSchemaValidate();
+        this.hudiWriteClient =
+                new HoodieJavaWriteClient<HoodieAvroPayload>(new 
HoodieJavaEngineContext(hadoopConf), cfg);
+        log.info("Open HoodieJavaWriteClient successfully");
+
+        inflightList = new ArrayList<>();
+        if (batchSize > 0) {
+            scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+            scheduledExecutor.scheduleAtFixedRate(
+                () -> {
+                    try {
+                        commit();
+                    } catch (Exception e) {
+                        log.error("Flush error when executed at fixed rate", 
e);
+                    }
+                }, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    private GenericRecord sinkDataEntry2GenericRecord(SinkDataEntry record) {
+        byte[] recordBytes = (byte[]) record.getPayload()[0];
+        GenericRecord genericRecord = new 
GenericData.Record(this.hudiConnectConfig.schema);
+        DatumReader<GenericRecord> userDatumReader = new 
SpecificDatumReader<GenericRecord>(this.hudiConnectConfig.schema);
+        BinaryDecoder decoder = 
DecoderFactory.get().binaryDecoder(recordBytes, null);
+        try {
+            if (!decoder.isEnd()) {
+                genericRecord = userDatumReader.read(genericRecord, decoder);
+            }
+        } catch (IOException e) {
+            log.error("SinkDataEntry convert to GenericRecord occur error,", 
e);
+        }
+        return genericRecord;
+    }
+
+    public boolean push(SinkDataEntry record) {
+        log.info("Updater Trying to push data");
+        Boolean isSuccess = true;
+        if (record == null) {
+            log.warn("Updater push sinkDataRecord null.");
+            return true;
+        }
+        synchronized (batchLocker) {
+            inflightList.add(record);
+        }
+        if (inflightList.size() >= batchSize) {
+            try {
+                scheduledExecutor.submit(this::commit);
+            } catch (Exception e) {
+                log.error("Updater commmit occur error", e);
+                isSuccess = false;
+            }
+        }
+        return isSuccess;
+    }
+
+    private void schemaEvolution(Schema newSchema, Schema oldSchema) {
+        if (null != oldSchema && 
oldSchema.toString().equals(newSchema.toString())) {
+            return;
+        }
+        log.info("Schema changed. New schema is " + newSchema.toString());
+        this.cfg = 
HoodieWriteConfig.newBuilder().withPath(hudiConnectConfig.getTablePath())
+                .withSchema(this.hudiConnectConfig.schema.toString())
+                .withEngineType(EngineType.JAVA)
+                
.withParallelism(hudiConnectConfig.getInsertShuffleParallelism(), 
hudiConnectConfig.getUpsertShuffleParallelism())
+                
.withDeleteParallelism(hudiConnectConfig.getDeleteParallelism()).forTable(hudiConnectConfig.getTableName())
+                
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
+                
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(20,
 30).build()).build();
+        this.hudiWriteClient.close();
+        Configuration hadoopConf = new Configuration();
+        hadoopConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
+        hadoopConf.set(AvroReadSupport.AVRO_DATA_SUPPLIER, 
GenericDataSupplier.class.getName());
+        this.hudiWriteClient =
+                new HoodieJavaWriteClient<HoodieAvroPayload>(new 
HoodieJavaEngineContext(hadoopConf), cfg);
+    }
+
+    public void commit() {
+        List<SinkDataEntry> commitList;
+        if (inflightList.isEmpty()) {
+            return;
+        }
+        synchronized (this.inflightList) {
+            commitList = inflightList;
+            inflightList = new ArrayList<>();
+        }
+        List<HoodieRecord> hoodieRecordsList = new ArrayList<>();
+        for (SinkDataEntry record : commitList) {
+            GenericRecord genericRecord = sinkDataEntry2GenericRecord(record);
+            HoodieRecord<HoodieAvroPayload> hoodieRecord = new 
HoodieRecord(new HoodieKey(UUID.randomUUID().toString(), "shardingKey-" + 
record.getQueueName()), new HoodieAvroPayload(Option.of(genericRecord)));
+            hoodieRecordsList.add(hoodieRecord);
+        }
+        try {
+            List<WriteStatus> statuses = 
hudiWriteClient.upsert(hoodieRecordsList, hudiWriteClient.startCommit());
+            log.info("Upserted data to hudi");
+            long upserted = statuses.get(0).getStat().getNumInserts();
+            if (upserted != commitList.size()) {
+                log.warn("Upserted num not equals input");
+            }
+        } catch (Exception e) {
+            log.error("Exception when upserting to Hudi", e);
+        }
+    }
+
+    public void start() throws Exception {
+        log.info("schema load success");
+    }
+
+    public void stop() {
+        this.hudiWriteClient.close();
+        log.info("Hudi sink updater stopped.");
+    }
+
+    public HudiConnectConfig getHudiConnectConfig() {
+        return hudiConnectConfig;
+    }
+
+    public void setHudiConnectConfig(HudiConnectConfig hudiConnectConfig) {
+        this.hudiConnectConfig = hudiConnectConfig;
+    }
+
+}
diff --git 
a/src/main/java/org/apache/rocketmq/connect/hudi/strategy/ITaskDivideStrategy.java
 
b/src/main/java/org/apache/rocketmq/connect/hudi/strategy/ITaskDivideStrategy.java
new file mode 100644
index 0000000..a91c066
--- /dev/null
+++ 
b/src/main/java/org/apache/rocketmq/connect/hudi/strategy/ITaskDivideStrategy.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hudi.strategy;
+
+import io.openmessaging.KeyValue;
+
+import java.util.List;
+
+
+public interface ITaskDivideStrategy {
+    List<KeyValue> divide(KeyValue source);
+}
diff --git 
a/src/main/java/org/apache/rocketmq/connect/hudi/strategy/TaskDivideByQueueStrategy.java
 
b/src/main/java/org/apache/rocketmq/connect/hudi/strategy/TaskDivideByQueueStrategy.java
new file mode 100644
index 0000000..c68e17c
--- /dev/null
+++ 
b/src/main/java/org/apache/rocketmq/connect/hudi/strategy/TaskDivideByQueueStrategy.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hudi.strategy;
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.connect.hudi.config.HudiConnectConfig;
+
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+
+public class TaskDivideByQueueStrategy implements ITaskDivideStrategy {
+    @Override
+    public List<KeyValue> divide(KeyValue source) {
+        List<KeyValue> config = new ArrayList<KeyValue>();
+        int parallelism = 
source.getInt(HudiConnectConfig.CONN_TASK_PARALLELISM);
+        Map<String, MessageQueue> topicRouteInfos = (Map<String, 
MessageQueue>) 
JSONObject.parse(source.getString(HudiConnectConfig.CONN_TOPIC_ROUTE_INFO));
+        int id = 0;
+        List<List<String>> taskTopicQueues = new ArrayList<>(parallelism);
+        for (Map.Entry<String, MessageQueue> topicQueue : 
topicRouteInfos.entrySet()) {
+            MessageQueue messageQueue = topicQueue.getValue();
+            String topicQueueStr = messageQueue.getTopic() + "," + 
messageQueue.getBrokerName() + "," + messageQueue.getQueueId();
+            int ind = ++id % parallelism;
+            if (taskTopicQueues.get(ind) != null) {
+                List<String> taskTopicQueue = new LinkedList<>();
+                taskTopicQueue.add(topicQueueStr);
+                taskTopicQueues.add(ind, taskTopicQueue);
+            } else {
+                List<String> taskTopicQueue = taskTopicQueues.get(ind);
+                taskTopicQueue.add(topicQueueStr);
+            }
+        }
+
+        for (int i = 0; i < parallelism; i++) {
+            // build single task queue config; format is 
topicName1,brokerName1,queueId1;topicName1,brokerName1,queueId2
+            String singleTaskTopicQueueStr = "";
+            List<String> singleTaskTopicQueues = taskTopicQueues.get(i);
+            for (String singleTopicQueue : singleTaskTopicQueues) {
+                singleTaskTopicQueueStr += singleTopicQueue + ";";
+            }
+            singleTaskTopicQueueStr = singleTaskTopicQueueStr.substring(0, 
singleTaskTopicQueueStr.length() - 1);
+            // fill connect config;
+            KeyValue keyValue = new DefaultKeyValue();
+            keyValue.put(HudiConnectConfig.CONN_TOPIC_QUEUES, 
singleTaskTopicQueueStr);
+            keyValue.put(HudiConnectConfig.CONN_HUDI_TABLE_PATH, 
source.getString(HudiConnectConfig.CONN_HUDI_TABLE_PATH));
+            keyValue.put(HudiConnectConfig.CONN_HUDI_TABLE_NAME, 
source.getString(HudiConnectConfig.CONN_HUDI_TABLE_NAME));
+            
keyValue.put(HudiConnectConfig.CONN_HUDI_INSERT_SHUFFLE_PARALLELISM, 
source.getInt(HudiConnectConfig.CONN_HUDI_INSERT_SHUFFLE_PARALLELISM));
+            
keyValue.put(HudiConnectConfig.CONN_HUDI_UPSERT_SHUFFLE_PARALLELISM, 
source.getInt(HudiConnectConfig.CONN_HUDI_UPSERT_SHUFFLE_PARALLELISM));
+            keyValue.put(HudiConnectConfig.CONN_HUDI_DELETE_PARALLELISM, 
source.getInt(HudiConnectConfig.CONN_HUDI_DELETE_PARALLELISM));
+            keyValue.put(HudiConnectConfig.CONN_SOURCE_RECORD_CONVERTER, 
source.getString(HudiConnectConfig.CONN_SOURCE_RECORD_CONVERTER));
+            keyValue.put(HudiConnectConfig.CONN_SCHEMA_PATH, 
source.getString(HudiConnectConfig.CONN_SCHEMA_PATH));
+            keyValue.put(HudiConnectConfig.CONN_TASK_PARALLELISM, 
source.getInt(HudiConnectConfig.CONN_TASK_PARALLELISM));
+            keyValue.put(HudiConnectConfig.CONN_SCHEMA_PATH, 
source.getString(HudiConnectConfig.CONN_SCHEMA_PATH));
+            config.add(keyValue);
+        }
+
+        return config;
+    }
+}
diff --git 
a/src/main/java/org/apache/rocketmq/connect/hudi/strategy/TaskDivideStrategyFactory.java
 
b/src/main/java/org/apache/rocketmq/connect/hudi/strategy/TaskDivideStrategyFactory.java
new file mode 100644
index 0000000..1d693a8
--- /dev/null
+++ 
b/src/main/java/org/apache/rocketmq/connect/hudi/strategy/TaskDivideStrategyFactory.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hudi.strategy;
+
+
+public class TaskDivideStrategyFactory {
+    public static ITaskDivideStrategy getInstance() {
+        return new TaskDivideByQueueStrategy();
+    }
+}
diff --git a/style/rmq_checkstyle.xml b/style/rmq_checkstyle.xml
new file mode 100644
index 0000000..776b305
--- /dev/null
+++ b/style/rmq_checkstyle.xml
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<!DOCTYPE module PUBLIC
+    "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
+    "http://www.puppycrawl.com/dtds/configuration_1_3.dtd";>
+<!--Refer 
http://checkstyle.sourceforge.net/reports/google-java-style.html#s2.2-file-encoding
 -->
+<module name="Checker">
+
+    <property name="localeLanguage" value="en"/>
+
+    <!--To configure the check to report on the first instance in each file-->
+    <module name="FileTabCharacter"/>
+
+    <!-- header -->
+    <module name="RegexpHeader">
+        <property name="header" value="/\*\nLicensed to the Apache Software 
Foundation*"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="System\.out\.println"/>
+        <property name="message" value="Prohibit invoking System.out.println 
in source code !"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="//FIXME"/>
+        <property name="message" value="Recommended fix FIXME task !"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="//TODO"/>
+        <property name="message" value="Recommended fix TODO task !"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="@alibaba"/>
+        <property name="message" value="Recommended remove @alibaba keyword!"/>
+    </module>
+    <module name="RegexpSingleline">
+        <property name="format" value="@taobao"/>
+        <property name="message" value="Recommended remove @taobao keyword!"/>
+    </module>
+    <module name="RegexpSingleline">
+        <property name="format" value="@author"/>
+        <property name="message" value="Recommended remove @author tag in 
javadoc!"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format"
+                  
value=".*[\u3400-\u4DB5\u4E00-\u9FA5\u9FA6-\u9FBB\uF900-\uFA2D\uFA30-\uFA6A\uFA70-\uFAD9\uFF00-\uFFEF\u2E80-\u2EFF\u3000-\u303F\u31C0-\u31EF]+.*"/>
+        <property name="message" value="Not allow chinese character !"/>
+    </module>
+
+    <module name="FileLength">
+        <property name="max" value="3000"/>
+    </module>
+
+    <module name="TreeWalker">
+
+        <module name="UnusedImports">
+            <property name="processJavadoc" value="true"/>
+        </module>
+        <module name="RedundantImport"/>
+
+        <!--<module name="IllegalImport" />-->
+
+        <!--Checks that classes that override equals() also override 
hashCode()-->
+        <module name="EqualsHashCode"/>
+        <!--Checks for over-complicated boolean expressions. Currently finds 
code like if (topic == true), topic || true, !false, etc.-->
+        <module name="SimplifyBooleanExpression"/>
+        <module name="OneStatementPerLine"/>
+        <module name="UnnecessaryParentheses"/>
+        <!--Checks for over-complicated boolean return statements. For example 
the following code-->
+        <module name="SimplifyBooleanReturn"/>
+
+        <!--Check that the default is after all the cases in producerGroup 
switch statement-->
+        <module name="DefaultComesLast"/>
+        <!--Detects empty statements (standalone ";" semicolon)-->
+        <module name="EmptyStatement"/>
+        <!--Checks that long constants are defined with an upper ell-->
+        <module name="UpperEll"/>
+        <module name="ConstantName">
+            <property name="format" 
value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)"/>
+        </module>
+        <!--Checks that local, non-final variable names conform to 
producerGroup format specified by the format property-->
+        <module name="LocalVariableName"/>
+        <!--Validates identifiers for local, final variables, including catch 
parameters-->
+        <module name="LocalFinalVariableName"/>
+        <!--Validates identifiers for non-static fields-->
+        <module name="MemberName"/>
+        <!--Validates identifiers for class type parameters-->
+        <module name="ClassTypeParameterName">
+            <property name="format" value="^[A-Z0-9]*$"/>
+        </module>
+        <!--Validates identifiers for method type parameters-->
+        <module name="MethodTypeParameterName">
+            <property name="format" value="^[A-Z0-9]*$"/>
+        </module>
+        <module name="PackageName"/>
+        <module name="ParameterName"/>
+        <module name="StaticVariableName"/>
+        <module name="TypeName"/>
+        <!--Checks that there are no import statements that use the * 
notation-->
+        <module name="AvoidStarImport"/>
+
+        <!--whitespace-->
+        <module name="GenericWhitespace"/>
+        <module name="NoWhitespaceBefore"/>
+        <module name="WhitespaceAfter"/>
+        <module name="NoWhitespaceAfter"/>
+        <module name="WhitespaceAround">
+            <property name="allowEmptyConstructors" value="true"/>
+            <property name="allowEmptyMethods" value="true"/>
+        </module>
+        <module name="Indentation"/>
+        <module name="MethodParamPad"/>
+        <module name="ParenPad"/>
+        <module name="TypecastParenPad"/>
+    </module>
+</module>

Reply via email to