This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new cf49208c [OSPP2023] Feat: Support HBase Sink connector (#522)
cf49208c is described below

commit cf49208c816e47dc5a8776f206778b3265d0c1dd
Author: Ao Qiao <[email protected]>
AuthorDate: Sun Sep 24 19:39:30 2023 +0800

    [OSPP2023] Feat: Support HBase Sink connector (#522)
    
    * initial commit
    
    * commit
    
    * rename
    
    * finish source connector
    
    * finish sink connector
    
    * add license
    
    * Update README.md
    
    * Update connect-standalone.conf
    
    * init
    
    * commit
    
    * support hbase sink connector
    
    * merge master
    
    * merge master
    
    * add licence
    
    * remove redundant dependency
    
    * ci
---
 connectors/rocketmq-connect-hbase/README.md        |  30 +++
 connectors/rocketmq-connect-hbase/pom.xml          | 201 +++++++++++++++++++++
 .../connect/hbase/config/HBaseConstants.java       |  44 +++++
 .../connect/hbase/config/HBaseSinkConfig.java      | 127 +++++++++++++
 .../connect/hbase/helper/HBaseHelperClient.java    | 135 ++++++++++++++
 .../connect/hbase/sink/HBaseSinkConnector.java     |  57 ++++++
 .../rocketmq/connect/hbase/sink/HBaseSinkTask.java |  83 +++++++++
 .../connect/hbase/sink/HBaseSinkTaskTest.java      | 140 ++++++++++++++
 8 files changed, 817 insertions(+)

diff --git a/connectors/rocketmq-connect-hbase/README.md 
b/connectors/rocketmq-connect-hbase/README.md
new file mode 100644
index 00000000..f168ea7a
--- /dev/null
+++ b/connectors/rocketmq-connect-hbase/README.md
@@ -0,0 +1,30 @@
+
+##### HBaseSinkConnector fully-qualified name
+org.apache.rocketmq.connect.hbase.sink.HBaseSinkConnector
+
+**hbase-sink-connector** start
+
+```
+POST  http://${runtime-ip}:${runtime-port}/connectors/HBaseSinkConnector
+{
+    
"connector.class":"org.apache.rocketmq.connect.hbase.sink.HBaseSinkConnector",
+    "zkquorum":"localhost:2181",
+    "columnfamily":"cf",
+    "username":"default",
+    "password":"123456",
+    "connect.topicnames":"testHBaseTopic",
+    
"value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
+    
"key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
+}
+```
+
+##### parameter configuration
+
+| parameter          | effect                                         | 
required        | default |
+|--------------------|------------------------------------------------|-----------------|---------|
+| zkquorum           | The Endpoint of the Zookeeper server           | yes    
         | null    |
+| columnfamily       | The Column Family of the Destination table     | yes    
         | null    |
+| username           | The UserName to login to HBase server          | yes    
         | null    |
+| password           | The Password of the UserName                   | no     
         | null    |
+| hbasemaster        | The Endpoint of HBase Master server            | no     
         | null    |
+| connect.topicnames | RocketMQ topic for sink connector to read from | yes 
(sink only) | null    |
diff --git a/connectors/rocketmq-connect-hbase/pom.xml 
b/connectors/rocketmq-connect-hbase/pom.xml
new file mode 100644
index 00000000..689efc33
--- /dev/null
+++ b/connectors/rocketmq-connect-hbase/pom.xml
@@ -0,0 +1,201 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor
+       license agreements. See the NOTICE file distributed with this work for 
additional
+       information regarding copyright ownership. The ASF licenses this file to
+       You under the Apache License, Version 2.0 (the "License"); you may not 
use
+       this file except in compliance with the License. You may obtain a copy 
of
+       the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
required
+       by applicable law or agreed to in writing, software distributed under 
the
+       License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS
+       OF ANY KIND, either express or implied. See the License for the specific
+       language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>rocketmq-connect-hbase</artifactId>
+    <version>1.0-SNAPSHOT</version>
+
+    <name>connect-hbase</name>
+
+    <licenses>
+        <license>
+            <name>The Apache Software License, Version 2.0</name>
+            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+        </license>
+    </licenses>
+
+    <issueManagement>
+        <system>jira</system>
+        <url>https://issues.apache.org/jira/browse/RocketMQ</url>
+    </issueManagement>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>versions-maven-plugin</artifactId>
+                <version>2.3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.7</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.6.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.19.1</version>
+                <configuration>
+                    <argLine>-Xms512m -Xmx1024m</argLine>
+                    <forkMode>always</forkMode>
+                    <includes>
+                        <include>**/*Test.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-site-plugin</artifactId>
+                <version>3.6</version>
+                <configuration>
+                    <locales>en_US</locales>
+                    <outputEncoding>UTF-8</outputEncoding>
+                    <inputEncoding>UTF-8</inputEncoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                    <locale>en_US</locale>
+                    
<excludePackageNames>io.openmessaging.internal</excludePackageNames>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>aggregate</id>
+                        <goals>
+                            <goal>aggregate</goal>
+                        </goals>
+                        <phase>site</phase>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>3.0.2</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <version>0.12</version>
+                <configuration>
+                    <excludes>
+                        <exclude>README.md</exclude>
+                        <exclude>README-CN.md</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.0.0</version>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <version>0.1.4</version>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.lz4</groupId>
+            <artifactId>lz4-java</artifactId>
+            <version>1.8.0</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>1.2.83</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>RELEASE</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.7</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+            <version>2.4.8</version>
+        </dependency>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>1.18.24</version>
+            <scope>compile</scope>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git 
a/connectors/rocketmq-connect-hbase/src/main/java/org/apache/rocketmq/connect/hbase/config/HBaseConstants.java
 
b/connectors/rocketmq-connect-hbase/src/main/java/org/apache/rocketmq/connect/hbase/config/HBaseConstants.java
new file mode 100644
index 00000000..050ee689
--- /dev/null
+++ 
b/connectors/rocketmq-connect-hbase/src/main/java/org/apache/rocketmq/connect/hbase/config/HBaseConstants.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hbase.config;
+
+public class HBaseConstants {
+    public static final String HBASE_ZOOKEEPER_QUORUM = 
"hbase.zookeeper.quorum";
+
+    public static final String HBASE_MASTER = "hbase.master";
+
+    public static final String COLUMN_FAMILY = "columnfamily";
+
+    public static final String DEFAULT_COLUMN_FAMILY = "DefaultColumnFamily";
+
+    public static final String HBASE_ZK_QUORUM = "zkquorum";
+
+    public static final String HBASE_MASTER_CONFIG = "hbasemaster";
+
+    public static final String HBASE_CLIENT_USERNAME = "hbase.client.username";
+
+    public static final String HBASE_CLIENT_PASSWORD = "hbase.client.password";
+
+    public static final String HBASE_USERNAME_CONFIG = "username";
+
+    public static final String HBASE_PASSWORD_CONFIG = "password";
+
+
+
+
+}
diff --git 
a/connectors/rocketmq-connect-hbase/src/main/java/org/apache/rocketmq/connect/hbase/config/HBaseSinkConfig.java
 
b/connectors/rocketmq-connect-hbase/src/main/java/org/apache/rocketmq/connect/hbase/config/HBaseSinkConfig.java
new file mode 100644
index 00000000..ca66bd4f
--- /dev/null
+++ 
b/connectors/rocketmq-connect-hbase/src/main/java/org/apache/rocketmq/connect/hbase/config/HBaseSinkConfig.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hbase.config;
+
+import io.openmessaging.KeyValue;
+
+import java.lang.reflect.Method;
+import java.util.HashSet;
+import java.util.Set;
+
+public class HBaseSinkConfig {
+
+    public static final Set<String> SINK_REQUEST_CONFIG = new 
HashSet<String>() {
+        {
+            add(HBaseConstants.HBASE_ZK_QUORUM);
+            add(HBaseConstants.COLUMN_FAMILY);
+        }
+    };
+    private String hbaseMaster;
+
+    private String zkQuorum;
+
+    private String columnFamily;
+
+    private String userName;
+
+    private String passWord;
+
+    public String getUserName() {
+        return userName;
+    }
+
+    public void setUserName(String userName) {
+        this.userName = userName;
+    }
+
+    public String getPassWord() {
+        return passWord;
+    }
+
+    public void setPassWord(String passWord) {
+        this.passWord = passWord;
+    }
+
+    public String getHbaseMaster() {
+        return hbaseMaster;
+    }
+
+    public void setHbaseMaster(String hbaseMaster) {
+        this.hbaseMaster = hbaseMaster;
+    }
+
+    public String getZkQuorum() {
+        return zkQuorum;
+    }
+
+    public void setZkQuorum(String zkQuorum) {
+        this.zkQuorum = zkQuorum;
+    }
+
+    public String getColumnFamily() {
+        return columnFamily;
+    }
+
+    public void setColumnFamily(String columnFamily) {
+        this.columnFamily = columnFamily;
+    }
+
+    public void load(KeyValue props) {
+        properties2Object(props, this);
+    }
+
+    private void properties2Object(final KeyValue p, final Object object) {
+
+        Method[] methods = object.getClass().getMethods();
+        for (Method method : methods) {
+            String mn = method.getName();
+            if (mn.startsWith("set")) {
+                try {
+                    String tmp = mn.substring(3);
+                    String key = tmp.toLowerCase();
+
+                    String property = p.getString(key);
+                    if (property != null) {
+                        Class<?>[] pt = method.getParameterTypes();
+                        if (pt != null && pt.length > 0) {
+                            String cn = pt[0].getSimpleName();
+                            Object arg;
+                            if (cn.equals("int") || cn.equals("Integer")) {
+                                arg = Integer.parseInt(property);
+                            } else if (cn.equals("long") || cn.equals("Long")) 
{
+                                arg = Long.parseLong(property);
+                            } else if (cn.equals("double") || 
cn.equals("Double")) {
+                                arg = Double.parseDouble(property);
+                            } else if (cn.equals("boolean") || 
cn.equals("Boolean")) {
+                                arg = Boolean.parseBoolean(property);
+                            } else if (cn.equals("float") || 
cn.equals("Float")) {
+                                arg = Float.parseFloat(property);
+                            } else if (cn.equals("String")) {
+                                arg = property;
+                            } else {
+                                continue;
+                            }
+                            method.invoke(object, arg);
+                        }
+                    }
+                } catch (Throwable ignored) {
+                }
+            }
+        }
+    }
+}
diff --git 
a/connectors/rocketmq-connect-hbase/src/main/java/org/apache/rocketmq/connect/hbase/helper/HBaseHelperClient.java
 
b/connectors/rocketmq-connect-hbase/src/main/java/org/apache/rocketmq/connect/hbase/helper/HBaseHelperClient.java
new file mode 100644
index 00000000..ed4122a1
--- /dev/null
+++ 
b/connectors/rocketmq-connect-hbase/src/main/java/org/apache/rocketmq/connect/hbase/helper/HBaseHelperClient.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hbase.helper;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.rocketmq.connect.hbase.config.HBaseConstants;
+import org.apache.rocketmq.connect.hbase.config.HBaseSinkConfig;
+
+import java.util.List;
+
+@Slf4j
+public class HBaseHelperClient {
+
+    private Configuration configuration;
+
+    private Connection connection;
+
+    private HBaseAdmin admin;
+
+
+    public boolean tableExists(String tableName) {
+        try {
+            if (admin == null) {
+                return false;
+            }
+            return admin.tableExists(TableName.valueOf(tableName));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void createTable(String tableName, List<String> columnFamilies) {
+        if (tableExists(tableName)) {
+            log.warn("table already exist, {}", tableName);
+            return;
+        }
+
+        HTableDescriptor hTableDescriptor = new 
HTableDescriptor(TableName.valueOf(tableName));
+        //添加列族
+        if (CollectionUtils.isNotEmpty(columnFamilies)) {
+            columnFamilies.forEach(columnFamily -> {
+                HColumnDescriptor f = new HColumnDescriptor(columnFamily);
+                hTableDescriptor.addFamily(f);
+            });
+        }
+        try {
+            //创建表
+            admin.createTable(hTableDescriptor);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void batchInsert(String tableName, List<Put> puts) {
+        try {
+            Table table = connection.getTable(TableName.valueOf(tableName));
+            table.put(puts);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public HBaseHelperClient(HBaseSinkConfig hBaseSinkConfig) {
+        getConnection(hBaseSinkConfig);
+        initAdmin();
+    }
+
+    private void getConnection(HBaseSinkConfig hBaseSinkConfig) {
+        configuration = HBaseConfiguration.create();
+        configuration.set(HBaseConstants.HBASE_ZOOKEEPER_QUORUM, 
hBaseSinkConfig.getZkQuorum());
+
+        if (StringUtils.isNotBlank(hBaseSinkConfig.getHbaseMaster())) {
+            configuration.set(HBaseConstants.HBASE_MASTER, 
hBaseSinkConfig.getHbaseMaster());
+        }
+        if (StringUtils.isNotBlank(hBaseSinkConfig.getUserName())) {
+            configuration.set(HBaseConstants.HBASE_CLIENT_USERNAME, 
hBaseSinkConfig.getUserName());
+        }
+        if (StringUtils.isNotBlank(hBaseSinkConfig.getPassWord())) {
+            configuration.set(HBaseConstants.HBASE_CLIENT_PASSWORD, 
hBaseSinkConfig.getPassWord());
+        }
+        try {
+            connection = ConnectionFactory.createConnection(configuration);
+        } catch (Exception e) {
+            throw new RuntimeException("Cannot connect to hbase server! ", e);
+        }
+    }
+
+    private void initAdmin() {
+        if (connection == null) {
+            throw new RuntimeException("Cannot connect to hbase server!");
+        }
+        try {
+            admin = (HBaseAdmin) connection.getAdmin();
+        } catch (Exception e) {
+            throw new RuntimeException("Cannot get admin from hbase server! ", 
e);
+        }
+    }
+
+    public void close() {
+        try {
+            admin.close();
+            connection.close();
+        } catch (Exception e) {
+            log.error("error when closing", e);
+        }
+
+    }
+}
diff --git 
a/connectors/rocketmq-connect-hbase/src/main/java/org/apache/rocketmq/connect/hbase/sink/HBaseSinkConnector.java
 
b/connectors/rocketmq-connect-hbase/src/main/java/org/apache/rocketmq/connect/hbase/sink/HBaseSinkConnector.java
new file mode 100644
index 00000000..ae808521
--- /dev/null
+++ 
b/connectors/rocketmq-connect-hbase/src/main/java/org/apache/rocketmq/connect/hbase/sink/HBaseSinkConnector.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hbase.sink;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import org.apache.rocketmq.connect.hbase.config.HBaseSinkConfig;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class HBaseSinkConnector extends SinkConnector {
+    private KeyValue keyValue;
+
+    @Override public List<KeyValue> taskConfigs(int maxTasks) {
+        List<KeyValue> configs = new ArrayList<>();
+        for (int i = 0; i < maxTasks; i++) {
+            configs.add(this.keyValue);
+        }
+        return configs;
+    }
+
+    @Override public Class<? extends Task> taskClass() {
+        return HBaseSinkTask.class;
+    }
+
+    @Override public void start(KeyValue value) {
+
+        for (String requestKey : HBaseSinkConfig.SINK_REQUEST_CONFIG) {
+            if (!value.containsKey(requestKey)) {
+                throw new RuntimeException("Request config key: " + 
requestKey);
+            }
+        }
+
+        this.keyValue = value;
+    }
+
+    @Override public void stop() {
+        this.keyValue = null;
+    }
+}
diff --git 
a/connectors/rocketmq-connect-hbase/src/main/java/org/apache/rocketmq/connect/hbase/sink/HBaseSinkTask.java
 
b/connectors/rocketmq-connect-hbase/src/main/java/org/apache/rocketmq/connect/hbase/sink/HBaseSinkTask.java
new file mode 100644
index 00000000..45d3b6f0
--- /dev/null
+++ 
b/connectors/rocketmq-connect-hbase/src/main/java/org/apache/rocketmq/connect/hbase/sink/HBaseSinkTask.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hbase.sink;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.rocketmq.connect.hbase.config.HBaseSinkConfig;
+import org.apache.rocketmq.connect.hbase.helper.HBaseHelperClient;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class HBaseSinkTask extends SinkTask {
+    public HBaseSinkConfig config;
+
+    private HBaseHelperClient helperClient;
+
+    @Override
+    public void put(List<ConnectRecord> sinkRecords) throws ConnectException {
+        if (sinkRecords == null || sinkRecords.size() < 1) {
+            return;
+        }
+        Map<String, List<Put>> valueMap = new HashMap<>();
+        for (ConnectRecord connectRecord : sinkRecords) {
+            String tableName = connectRecord.getSchema().getName();
+            List<Put> puts = valueMap.getOrDefault(tableName, new 
ArrayList<>());
+            final List<Field> fields = connectRecord.getSchema().getFields();
+            final Struct structData = (Struct) connectRecord.getData();
+            String index = structData.get(fields.get(0)).toString();
+            Put put = new Put(Bytes.toBytes(index));
+            fields.stream().filter(Objects::nonNull)
+                    .forEach(field -> 
put.addColumn(Bytes.toBytes(this.config.getColumnFamily()),
+                            Bytes.toBytes(field.getName()),
+                            Bytes.toBytes(structData.get(field).toString())));
+            puts.add(put);
+            valueMap.put(tableName, puts);
+        }
+
+        valueMap.forEach((tableName, puts) -> {
+            if (!this.helperClient.tableExists(tableName)) {
+                this.helperClient.createTable(tableName, 
Collections.singletonList(this.config.getColumnFamily()));
+            }
+            this.helperClient.batchInsert(tableName, puts);
+        });
+    }
+
+    @Override
+    public void start(KeyValue keyValue) {
+        this.config = new HBaseSinkConfig();
+        this.config.load(keyValue);
+        this.helperClient = new HBaseHelperClient(this.config);
+    }
+
+    @Override
+    public void stop() {
+        this.helperClient.close();
+    }
+}
diff --git 
a/connectors/rocketmq-connect-hbase/src/test/java/org/apache/rocketmq/connect/hbase/sink/HBaseSinkTaskTest.java
 
b/connectors/rocketmq-connect-hbase/src/test/java/org/apache/rocketmq/connect/hbase/sink/HBaseSinkTaskTest.java
new file mode 100644
index 00000000..c226e819
--- /dev/null
+++ 
b/connectors/rocketmq-connect-hbase/src/test/java/org/apache/rocketmq/connect/hbase/sink/HBaseSinkTaskTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hbase.sink;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.*;
+import io.openmessaging.internal.DefaultKeyValue;
+import junit.framework.TestCase;
+import org.apache.rocketmq.connect.hbase.config.HBaseConstants;
+import org.apache.rocketmq.connect.hbase.config.HBaseSinkConfig;
+import org.apache.rocketmq.connect.hbase.helper.HBaseHelperClient;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class HBaseSinkTaskTest extends TestCase {
+
+    private static final String zkHost = 
"ld-m5ejp1wxrnytqnmz1-proxy-lindorm-pub.lindorm.rds.aliyuncs.com:30020";
+    private static final String columnFamily = "cf";
+
+    private static final String userName = "root";
+
+    private static final String password = "xxxxxx";
+
+    public static void main(String[] args) {
+        List<ConnectRecord> records = new ArrayList<>();
+        // build schema
+        Schema schema = SchemaBuilder.struct()
+                .name("tableName")
+                .field("c1",SchemaBuilder.string().build())
+                .field("c2", SchemaBuilder.string().build())
+                .build();
+        // build record
+        String param0 = "1001";
+        Struct struct= new Struct(schema);
+        struct.put("c1",param0);
+        struct.put("c2",String.format("test-data-%s", param0));
+
+        Schema schema2 = SchemaBuilder.struct()
+            .name("t1")
+            .field("c1",SchemaBuilder.string().build())
+            .field("c2", SchemaBuilder.string().build())
+            .build();
+        // build record
+        Struct struct2= new Struct(schema2);
+        struct.put("c1",param0);
+        struct.put("c2",String.format("test-data-%s", param0));
+
+        for (int i = 0; i < 4; i++) {
+            ConnectRecord record = new ConnectRecord(
+                // offset partition
+                // offset partition"
+                new RecordPartition(new ConcurrentHashMap<>()),
+                new RecordOffset(new HashMap<>()),
+                System.currentTimeMillis(),
+                schema,
+                struct
+            );
+            records.add(record);
+
+            ConnectRecord record2 = new ConnectRecord(
+                // offset partition
+                // offset partition"
+                new RecordPartition(new ConcurrentHashMap<>()),
+                new RecordOffset(new HashMap<>()),
+                System.currentTimeMillis(),
+                schema2,
+                struct
+            );
+            records.add(record2);
+
+        }
+
+        HBaseSinkTask task = new HBaseSinkTask();
+        KeyValue config = new DefaultKeyValue();
+        config.put(HBaseConstants.COLUMN_FAMILY, columnFamily);
+        config.put(HBaseConstants.HBASE_ZK_QUORUM, zkHost);
+        config.put(HBaseConstants.HBASE_USERNAME_CONFIG, userName);
+        config.put(HBaseConstants.HBASE_PASSWORD_CONFIG, password);
+
+        task.start(config);
+        task.put(records);
+    }
+
+    @Test
+    public void testClient() {
+        String tableName = "test";
+        HBaseSinkConfig config = new HBaseSinkConfig();
+        config.setColumnFamily(columnFamily);
+        config.setZkQuorum(zkHost);
+        config.setUserName(userName);
+        config.setPassWord(password);
+        HBaseHelperClient helperClient = new HBaseHelperClient(config);
+        boolean flag = helperClient.tableExists(tableName);
+        Assert.assertFalse(flag);
+    }
+
+    @Test
+    public void testConn() {
+        Socket connect = new Socket();
+        try {
+            connect.connect(new InetSocketAddress("localhost", 
2181),100);//建立连接
+            boolean res = connect.isConnected();//通过现有方法查看连通状态
+            System.out.println(res);//true为连通
+        } catch (IOException e) {
+            System.out.println("false");//当连不通时,直接抛异常,异常捕获即可
+        }finally{
+            try {
+                connect.close();
+            } catch (IOException e) {
+                System.out.println("false");
+            }
+        }
+    }
+
+
+
+}
\ No newline at end of file


Reply via email to