This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new cf49208c [OSPP2023] Feat: Support HBase Sink connector (#522)
cf49208c is described below
commit cf49208c816e47dc5a8776f206778b3265d0c1dd
Author: Ao Qiao <[email protected]>
AuthorDate: Sun Sep 24 19:39:30 2023 +0800
[OSPP2023] Feat: Support HBase Sink connector (#522)
* initial commit
* commit
* rename
* finish source connector
* finish sink connector
* add license
* Update README.md
* Update connect-standalone.conf
* init
* commit
* support hbase sink connector
* merge master
* merge master
* add licence
* remove redundant dependency
* ci
---
connectors/rocketmq-connect-hbase/README.md | 30 +++
connectors/rocketmq-connect-hbase/pom.xml | 201 +++++++++++++++++++++
.../connect/hbase/config/HBaseConstants.java | 44 +++++
.../connect/hbase/config/HBaseSinkConfig.java | 127 +++++++++++++
.../connect/hbase/helper/HBaseHelperClient.java | 135 ++++++++++++++
.../connect/hbase/sink/HBaseSinkConnector.java | 57 ++++++
.../rocketmq/connect/hbase/sink/HBaseSinkTask.java | 83 +++++++++
.../connect/hbase/sink/HBaseSinkTaskTest.java | 140 ++++++++++++++
8 files changed, 817 insertions(+)
diff --git a/connectors/rocketmq-connect-hbase/README.md
b/connectors/rocketmq-connect-hbase/README.md
new file mode 100644
index 00000000..f168ea7a
--- /dev/null
+++ b/connectors/rocketmq-connect-hbase/README.md
@@ -0,0 +1,30 @@
+
+##### HBaseSinkConnector fully-qualified name
+org.apache.rocketmq.connect.hbase.sink.HBaseSinkConnector
+
+**hbase-sink-connector** start
+
+```
+POST http://${runtime-ip}:${runtime-port}/connectors/HBaseSinkConnector
+{
+
"connector.class":"org.apache.rocketmq.connect.hbase.sink.HBaseSinkConnector",
+ "zkquorum":"localhost:2181",
+ "columnfamily":"cf",
+ "username":"default",
+ "password":"123456",
+ "connect.topicnames":"testHBaseTopic",
+
"value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
+
"key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
+}
+```
+
+##### parameter configuration
+
+| parameter | effect |
required | default |
+|--------------------|------------------------------------------------|-----------------|---------|
+| zkquorum | The Endpoint of the Zookeeper server | yes
| null |
+| columnfamily | The Column Family of the Destination table | yes
| null |
+| username | The UserName to login to HBase server | yes
| null |
+| password | The Password of the UserName | no
| null |
+| hbasemaster | The Endpoint of HBase Master server | no
| null |
+| connect.topicnames | RocketMQ topic for sink connector to read from | yes
(sink only) | null |
diff --git a/connectors/rocketmq-connect-hbase/pom.xml
b/connectors/rocketmq-connect-hbase/pom.xml
new file mode 100644
index 00000000..689efc33
--- /dev/null
+++ b/connectors/rocketmq-connect-hbase/pom.xml
@@ -0,0 +1,201 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more
contributor
+ license agreements. See the NOTICE file distributed with this work for
additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not
use
+ this file except in compliance with the License. You may obtain a copy
of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
required
+ by applicable law or agreed to in writing, software distributed under
the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-connect-hbase</artifactId>
+ <version>1.0-SNAPSHOT</version>
+
+ <name>connect-hbase</name>
+
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ </license>
+ </licenses>
+
+ <issueManagement>
+ <system>jira</system>
+ <url>https://issues.apache.org/jira/browse/RocketMQ</url>
+ </issueManagement>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>versions-maven-plugin</artifactId>
+ <version>2.3</version>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>clirr-maven-plugin</artifactId>
+ <version>2.7</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.6.1</version>
+ <configuration>
+ <source>${maven.compiler.source}</source>
+ <target>${maven.compiler.target}</target>
+ <compilerVersion>${maven.compiler.source}</compilerVersion>
+ <showDeprecation>true</showDeprecation>
+ <showWarnings>true</showWarnings>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.19.1</version>
+ <configuration>
+ <argLine>-Xms512m -Xmx1024m</argLine>
+ <forkMode>always</forkMode>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.6</version>
+ <configuration>
+ <locales>en_US</locales>
+ <outputEncoding>UTF-8</outputEncoding>
+ <inputEncoding>UTF-8</inputEncoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>3.0.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.10.4</version>
+ <configuration>
+ <charset>UTF-8</charset>
+ <locale>en_US</locale>
+
<excludePackageNames>io.openmessaging.internal</excludePackageNames>
+ </configuration>
+ <executions>
+ <execution>
+ <id>aggregate</id>
+ <goals>
+ <goal>aggregate</goal>
+ </goals>
+ <phase>site</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>3.0.2</version>
+ <configuration>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>3.0.4</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <version>0.12</version>
+ <configuration>
+ <excludes>
+ <exclude>README.md</exclude>
+ <exclude>README-CN.md</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.0.0</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-connector</artifactId>
+ <version>0.1.4</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.lz4</groupId>
+ <artifactId>lz4-java</artifactId>
+ <version>1.8.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ <version>1.2.83</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>RELEASE</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.7</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>2.4.8</version>
+ </dependency>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <version>1.18.24</version>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git
a/connectors/rocketmq-connect-hbase/src/main/java/org/apache/rocketmq/connect/hbase/config/HBaseConstants.java
b/connectors/rocketmq-connect-hbase/src/main/java/org/apache/rocketmq/connect/hbase/config/HBaseConstants.java
new file mode 100644
index 00000000..050ee689
--- /dev/null
+++
b/connectors/rocketmq-connect-hbase/src/main/java/org/apache/rocketmq/connect/hbase/config/HBaseConstants.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hbase.config;
+
+public class HBaseConstants {
+ public static final String HBASE_ZOOKEEPER_QUORUM =
"hbase.zookeeper.quorum";
+
+ public static final String HBASE_MASTER = "hbase.master";
+
+ public static final String COLUMN_FAMILY = "columnfamily";
+
+ public static final String DEFAULT_COLUMN_FAMILY = "DefaultColumnFamily";
+
+ public static final String HBASE_ZK_QUORUM = "zkquorum";
+
+ public static final String HBASE_MASTER_CONFIG = "hbasemaster";
+
+ public static final String HBASE_CLIENT_USERNAME = "hbase.client.username";
+
+ public static final String HBASE_CLIENT_PASSWORD = "hbase.client.password";
+
+ public static final String HBASE_USERNAME_CONFIG = "username";
+
+ public static final String HBASE_PASSWORD_CONFIG = "password";
+
+
+
+
+}
diff --git
a/connectors/rocketmq-connect-hbase/src/main/java/org/apache/rocketmq/connect/hbase/config/HBaseSinkConfig.java
b/connectors/rocketmq-connect-hbase/src/main/java/org/apache/rocketmq/connect/hbase/config/HBaseSinkConfig.java
new file mode 100644
index 00000000..ca66bd4f
--- /dev/null
+++
b/connectors/rocketmq-connect-hbase/src/main/java/org/apache/rocketmq/connect/hbase/config/HBaseSinkConfig.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hbase.config;
+
+import io.openmessaging.KeyValue;
+
+import java.lang.reflect.Method;
+import java.util.HashSet;
+import java.util.Set;
+
+public class HBaseSinkConfig {
+
+ public static final Set<String> SINK_REQUEST_CONFIG = new
HashSet<String>() {
+ {
+ add(HBaseConstants.HBASE_ZK_QUORUM);
+ add(HBaseConstants.COLUMN_FAMILY);
+ }
+ };
+ private String hbaseMaster;
+
+ private String zkQuorum;
+
+ private String columnFamily;
+
+ private String userName;
+
+ private String passWord;
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
+ public String getPassWord() {
+ return passWord;
+ }
+
+ public void setPassWord(String passWord) {
+ this.passWord = passWord;
+ }
+
+ public String getHbaseMaster() {
+ return hbaseMaster;
+ }
+
+ public void setHbaseMaster(String hbaseMaster) {
+ this.hbaseMaster = hbaseMaster;
+ }
+
+ public String getZkQuorum() {
+ return zkQuorum;
+ }
+
+ public void setZkQuorum(String zkQuorum) {
+ this.zkQuorum = zkQuorum;
+ }
+
+ public String getColumnFamily() {
+ return columnFamily;
+ }
+
+ public void setColumnFamily(String columnFamily) {
+ this.columnFamily = columnFamily;
+ }
+
+ public void load(KeyValue props) {
+ properties2Object(props, this);
+ }
+
+ private void properties2Object(final KeyValue p, final Object object) {
+
+ Method[] methods = object.getClass().getMethods();
+ for (Method method : methods) {
+ String mn = method.getName();
+ if (mn.startsWith("set")) {
+ try {
+ String tmp = mn.substring(3);
+ String key = tmp.toLowerCase();
+
+ String property = p.getString(key);
+ if (property != null) {
+ Class<?>[] pt = method.getParameterTypes();
+ if (pt != null && pt.length > 0) {
+ String cn = pt[0].getSimpleName();
+ Object arg;
+ if (cn.equals("int") || cn.equals("Integer")) {
+ arg = Integer.parseInt(property);
+ } else if (cn.equals("long") || cn.equals("Long"))
{
+ arg = Long.parseLong(property);
+ } else if (cn.equals("double") ||
cn.equals("Double")) {
+ arg = Double.parseDouble(property);
+ } else if (cn.equals("boolean") ||
cn.equals("Boolean")) {
+ arg = Boolean.parseBoolean(property);
+ } else if (cn.equals("float") ||
cn.equals("Float")) {
+ arg = Float.parseFloat(property);
+ } else if (cn.equals("String")) {
+ arg = property;
+ } else {
+ continue;
+ }
+ method.invoke(object, arg);
+ }
+ }
+ } catch (Throwable ignored) {
+ }
+ }
+ }
+ }
+}
diff --git
a/connectors/rocketmq-connect-hbase/src/main/java/org/apache/rocketmq/connect/hbase/helper/HBaseHelperClient.java
b/connectors/rocketmq-connect-hbase/src/main/java/org/apache/rocketmq/connect/hbase/helper/HBaseHelperClient.java
new file mode 100644
index 00000000..ed4122a1
--- /dev/null
+++
b/connectors/rocketmq-connect-hbase/src/main/java/org/apache/rocketmq/connect/hbase/helper/HBaseHelperClient.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hbase.helper;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.rocketmq.connect.hbase.config.HBaseConstants;
+import org.apache.rocketmq.connect.hbase.config.HBaseSinkConfig;
+
+import java.util.List;
+
+@Slf4j
+public class HBaseHelperClient {
+
+ private Configuration configuration;
+
+ private Connection connection;
+
+ private HBaseAdmin admin;
+
+
+ public boolean tableExists(String tableName) {
+ try {
+ if (admin == null) {
+ return false;
+ }
+ return admin.tableExists(TableName.valueOf(tableName));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void createTable(String tableName, List<String> columnFamilies) {
+ if (tableExists(tableName)) {
+ log.warn("table already exist, {}", tableName);
+ return;
+ }
+
+ HTableDescriptor hTableDescriptor = new
HTableDescriptor(TableName.valueOf(tableName));
+ //添加列族
+ if (CollectionUtils.isNotEmpty(columnFamilies)) {
+ columnFamilies.forEach(columnFamily -> {
+ HColumnDescriptor f = new HColumnDescriptor(columnFamily);
+ hTableDescriptor.addFamily(f);
+ });
+ }
+ try {
+ //创建表
+ admin.createTable(hTableDescriptor);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void batchInsert(String tableName, List<Put> puts) {
+ try {
+ Table table = connection.getTable(TableName.valueOf(tableName));
+ table.put(puts);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public HBaseHelperClient(HBaseSinkConfig hBaseSinkConfig) {
+ getConnection(hBaseSinkConfig);
+ initAdmin();
+ }
+
+ private void getConnection(HBaseSinkConfig hBaseSinkConfig) {
+ configuration = HBaseConfiguration.create();
+ configuration.set(HBaseConstants.HBASE_ZOOKEEPER_QUORUM,
hBaseSinkConfig.getZkQuorum());
+
+ if (StringUtils.isNotBlank(hBaseSinkConfig.getHbaseMaster())) {
+ configuration.set(HBaseConstants.HBASE_MASTER,
hBaseSinkConfig.getHbaseMaster());
+ }
+ if (StringUtils.isNotBlank(hBaseSinkConfig.getUserName())) {
+ configuration.set(HBaseConstants.HBASE_CLIENT_USERNAME,
hBaseSinkConfig.getUserName());
+ }
+ if (StringUtils.isNotBlank(hBaseSinkConfig.getPassWord())) {
+ configuration.set(HBaseConstants.HBASE_CLIENT_PASSWORD,
hBaseSinkConfig.getPassWord());
+ }
+ try {
+ connection = ConnectionFactory.createConnection(configuration);
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot connect to hbase server! ", e);
+ }
+ }
+
+ private void initAdmin() {
+ if (connection == null) {
+ throw new RuntimeException("Cannot connect to hbase server!");
+ }
+ try {
+ admin = (HBaseAdmin) connection.getAdmin();
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot get admin from hbase server! ",
e);
+ }
+ }
+
+ public void close() {
+ try {
+ admin.close();
+ connection.close();
+ } catch (Exception e) {
+ log.error("error when closing", e);
+ }
+
+ }
+}
diff --git
a/connectors/rocketmq-connect-hbase/src/main/java/org/apache/rocketmq/connect/hbase/sink/HBaseSinkConnector.java
b/connectors/rocketmq-connect-hbase/src/main/java/org/apache/rocketmq/connect/hbase/sink/HBaseSinkConnector.java
new file mode 100644
index 00000000..ae808521
--- /dev/null
+++
b/connectors/rocketmq-connect-hbase/src/main/java/org/apache/rocketmq/connect/hbase/sink/HBaseSinkConnector.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hbase.sink;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import org.apache.rocketmq.connect.hbase.config.HBaseSinkConfig;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class HBaseSinkConnector extends SinkConnector {
+ private KeyValue keyValue;
+
+ @Override public List<KeyValue> taskConfigs(int maxTasks) {
+ List<KeyValue> configs = new ArrayList<>();
+ for (int i = 0; i < maxTasks; i++) {
+ configs.add(this.keyValue);
+ }
+ return configs;
+ }
+
+ @Override public Class<? extends Task> taskClass() {
+ return HBaseSinkTask.class;
+ }
+
+ @Override public void start(KeyValue value) {
+
+ for (String requestKey : HBaseSinkConfig.SINK_REQUEST_CONFIG) {
+ if (!value.containsKey(requestKey)) {
+ throw new RuntimeException("Request config key: " +
requestKey);
+ }
+ }
+
+ this.keyValue = value;
+ }
+
+ @Override public void stop() {
+ this.keyValue = null;
+ }
+}
diff --git
a/connectors/rocketmq-connect-hbase/src/main/java/org/apache/rocketmq/connect/hbase/sink/HBaseSinkTask.java
b/connectors/rocketmq-connect-hbase/src/main/java/org/apache/rocketmq/connect/hbase/sink/HBaseSinkTask.java
new file mode 100644
index 00000000..45d3b6f0
--- /dev/null
+++
b/connectors/rocketmq-connect-hbase/src/main/java/org/apache/rocketmq/connect/hbase/sink/HBaseSinkTask.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hbase.sink;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.rocketmq.connect.hbase.config.HBaseSinkConfig;
+import org.apache.rocketmq.connect.hbase.helper.HBaseHelperClient;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class HBaseSinkTask extends SinkTask {
+ public HBaseSinkConfig config;
+
+ private HBaseHelperClient helperClient;
+
+ @Override
+ public void put(List<ConnectRecord> sinkRecords) throws ConnectException {
+ if (sinkRecords == null || sinkRecords.size() < 1) {
+ return;
+ }
+ Map<String, List<Put>> valueMap = new HashMap<>();
+ for (ConnectRecord connectRecord : sinkRecords) {
+ String tableName = connectRecord.getSchema().getName();
+ List<Put> puts = valueMap.getOrDefault(tableName, new
ArrayList<>());
+ final List<Field> fields = connectRecord.getSchema().getFields();
+ final Struct structData = (Struct) connectRecord.getData();
+ String index = structData.get(fields.get(0)).toString();
+ Put put = new Put(Bytes.toBytes(index));
+ fields.stream().filter(Objects::nonNull)
+ .forEach(field ->
put.addColumn(Bytes.toBytes(this.config.getColumnFamily()),
+ Bytes.toBytes(field.getName()),
+ Bytes.toBytes(structData.get(field).toString())));
+ puts.add(put);
+ valueMap.put(tableName, puts);
+ }
+
+ valueMap.forEach((tableName, puts) -> {
+ if (!this.helperClient.tableExists(tableName)) {
+ this.helperClient.createTable(tableName,
Collections.singletonList(this.config.getColumnFamily()));
+ }
+ this.helperClient.batchInsert(tableName, puts);
+ });
+ }
+
+ @Override
+ public void start(KeyValue keyValue) {
+ this.config = new HBaseSinkConfig();
+ this.config.load(keyValue);
+ this.helperClient = new HBaseHelperClient(this.config);
+ }
+
+ @Override
+ public void stop() {
+ this.helperClient.close();
+ }
+}
diff --git
a/connectors/rocketmq-connect-hbase/src/test/java/org/apache/rocketmq/connect/hbase/sink/HBaseSinkTaskTest.java
b/connectors/rocketmq-connect-hbase/src/test/java/org/apache/rocketmq/connect/hbase/sink/HBaseSinkTaskTest.java
new file mode 100644
index 00000000..c226e819
--- /dev/null
+++
b/connectors/rocketmq-connect-hbase/src/test/java/org/apache/rocketmq/connect/hbase/sink/HBaseSinkTaskTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hbase.sink;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.*;
+import io.openmessaging.internal.DefaultKeyValue;
+import junit.framework.TestCase;
+import org.apache.rocketmq.connect.hbase.config.HBaseConstants;
+import org.apache.rocketmq.connect.hbase.config.HBaseSinkConfig;
+import org.apache.rocketmq.connect.hbase.helper.HBaseHelperClient;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class HBaseSinkTaskTest extends TestCase {
+
+ private static final String zkHost =
"ld-m5ejp1wxrnytqnmz1-proxy-lindorm-pub.lindorm.rds.aliyuncs.com:30020";
+ private static final String columnFamily = "cf";
+
+ private static final String userName = "root";
+
+ private static final String password = "xxxxxx";
+
+ public static void main(String[] args) {
+ List<ConnectRecord> records = new ArrayList<>();
+ // build schema
+ Schema schema = SchemaBuilder.struct()
+ .name("tableName")
+ .field("c1",SchemaBuilder.string().build())
+ .field("c2", SchemaBuilder.string().build())
+ .build();
+ // build record
+ String param0 = "1001";
+ Struct struct= new Struct(schema);
+ struct.put("c1",param0);
+ struct.put("c2",String.format("test-data-%s", param0));
+
+ Schema schema2 = SchemaBuilder.struct()
+ .name("t1")
+ .field("c1",SchemaBuilder.string().build())
+ .field("c2", SchemaBuilder.string().build())
+ .build();
+ // build record
+ Struct struct2= new Struct(schema2);
+ struct.put("c1",param0);
+ struct.put("c2",String.format("test-data-%s", param0));
+
+ for (int i = 0; i < 4; i++) {
+ ConnectRecord record = new ConnectRecord(
+ // offset partition
+ // offset partition"
+ new RecordPartition(new ConcurrentHashMap<>()),
+ new RecordOffset(new HashMap<>()),
+ System.currentTimeMillis(),
+ schema,
+ struct
+ );
+ records.add(record);
+
+ ConnectRecord record2 = new ConnectRecord(
+ // offset partition
+ // offset partition"
+ new RecordPartition(new ConcurrentHashMap<>()),
+ new RecordOffset(new HashMap<>()),
+ System.currentTimeMillis(),
+ schema2,
+ struct
+ );
+ records.add(record2);
+
+ }
+
+ HBaseSinkTask task = new HBaseSinkTask();
+ KeyValue config = new DefaultKeyValue();
+ config.put(HBaseConstants.COLUMN_FAMILY, columnFamily);
+ config.put(HBaseConstants.HBASE_ZK_QUORUM, zkHost);
+ config.put(HBaseConstants.HBASE_USERNAME_CONFIG, userName);
+ config.put(HBaseConstants.HBASE_PASSWORD_CONFIG, password);
+
+ task.start(config);
+ task.put(records);
+ }
+
+ @Test
+ public void testClient() {
+ String tableName = "test";
+ HBaseSinkConfig config = new HBaseSinkConfig();
+ config.setColumnFamily(columnFamily);
+ config.setZkQuorum(zkHost);
+ config.setUserName(userName);
+ config.setPassWord(password);
+ HBaseHelperClient helperClient = new HBaseHelperClient(config);
+ boolean flag = helperClient.tableExists(tableName);
+ Assert.assertFalse(flag);
+ }
+
+ @Test
+ public void testConn() {
+ Socket connect = new Socket();
+ try {
+ connect.connect(new InetSocketAddress("localhost",
2181),100);//建立连接
+ boolean res = connect.isConnected();//通过现有方法查看连通状态
+ System.out.println(res);//true为连通
+ } catch (IOException e) {
+ System.out.println("false");//当连不通时,直接抛异常,异常捕获即可
+ }finally{
+ try {
+ connect.close();
+ } catch (IOException e) {
+ System.out.println("false");
+ }
+ }
+ }
+
+
+
+}
\ No newline at end of file