This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 68bda94a4 [Feature][Connector-V2][Hbase] Introduce hbase sink 
connector (#4049)
68bda94a4 is described below

commit 68bda94a4ce1839404ddb098a29eddef1df3758d
Author: Tyrantlucifer <[email protected]>
AuthorDate: Wed Feb 8 15:51:17 2023 +0800

    [Feature][Connector-V2][Hbase] Introduce hbase sink connector (#4049)
    
    * [Feature][Connector-V2][Hbase] Introduce hbase sink connector
    
    * [Feature][Connector-V2][Hbase] Introduce hbase sink connector
    
    * [Feature][Connector-V2][Hbase] Add scope for connector-hbase
    
    * [Feature][Connector-V2][Hbase] Add docs
    
    * [Feature][Connector-V2][Hbase] Add e2e test case
    
    * [Feature][Connector-V2][Hbase] Update release-note
    
    * [Feature][Connector-V2][Hbase] Remove useless parameter
    
    * [Feature][Connector-V2][Hbase] Fix code style
    
    * [Feature][Connector-V2][Hbase] Test all data types
    
    * [Feature][Connector-V2][Hbase] Optimize e2e test case
---
 config/plugin_config                               |   2 +
 docs/en/connector-v2/sink/Hbase.md                 | 121 ++++++++++++++
 plugin-mapping.properties                          |   1 +
 release-note.md                                    |   1 +
 .../common/config/TypesafeConfigUtils.java         |   9 +
 seatunnel-connectors-v2/connector-hbase/pom.xml    |  52 ++++++
 .../seatunnel/hbase/config/HbaseConfig.java        |  96 +++++++++++
 .../seatunnel/hbase/config/HbaseParameters.java    | 110 +++++++++++++
 .../hbase/exception/HbaseConnectorException.java   |  35 ++++
 .../connectors/seatunnel/hbase/sink/HbaseSink.java |  99 +++++++++++
 .../seatunnel/hbase/sink/HbaseSinkFactory.java     |  61 +++++++
 .../seatunnel/hbase/sink/HbaseSinkWriter.java      | 181 +++++++++++++++++++++
 .../seatunnel/hbase/HbaseFactoryTest.java          |  31 ++++
 seatunnel-connectors-v2/pom.xml                    |   1 +
 seatunnel-dist/pom.xml                             |   6 +
 .../connector-hbase-e2e/pom.xml                    |  39 +++++
 .../seatunnel/e2e/connector/hbase/HbaseIT.java     | 140 ++++++++++++++++
 .../src/test/resources/fake-to-hbase.conf          |  50 ++++++
 seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml   |   1 +
 19 files changed, 1036 insertions(+)

diff --git a/config/plugin_config b/config/plugin_config
index b3548ab64..77531f5e6 100644
--- a/config/plugin_config
+++ b/config/plugin_config
@@ -70,4 +70,6 @@ connector-slack
 connector-socket
 connector-starrocks
 connector-tablestore
+connector-selectdb-cloud
+connector-hbase
 --end--
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/Hbase.md 
b/docs/en/connector-v2/sink/Hbase.md
new file mode 100644
index 000000000..a470c6090
--- /dev/null
+++ b/docs/en/connector-v2/sink/Hbase.md
@@ -0,0 +1,121 @@
+# Hbase
+
+> Hbase sink connector
+
+## Description
+
+Output data to Hbase
+
+## Key features
+
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+
+## Options
+
+| name               | type    | required | default value   |
+|--------------------|---------|----------|-----------------|
+| zookeeper_quorum   | string  | yes      | -               |
+| table              | string  | yes      | -               |
+| rowkey_column      | list    | yes      | -               |
+| family_name        | config  | yes      | -               |
+| rowkey_delimiter   | string  | no       | ""              |
+| version_column     | string  | no       | -               |
+| null_mode          | string  | no       | skip            |
+| wal_write          | boolean | yes      | false           |
+| write_buffer_size  | string  | no       | 8 * 1024 * 1024 |
+| encoding           | string  | no       | utf8            |
+| hbase_extra_config | string  | no       | -               |
+| common-options     |         | no       | -               |
+
+### zookeeper_quorum [string]
+
+The zookeeper cluster host of hbase, example: 
"hadoop001:2181,hadoop002:2181,hadoop003:2181"
+
+### table [string]
+
+The table name you want to write, example: "seatunnel"
+
+### rowkey_column [list]
+
+The column name list of row keys, example: ["id", "uuid"]
+
+### family_name [config]
+
+The family name mapping of fields. For example the row from upstream like the 
following shown:
+
+| id  | name          | age |
+|-----|---------------|-----|
+| 1   | tyrantlucifer | 27  |
+
+id as the row key and other fields written to the different families, you can 
assign
+
+family_name {
+    name = "info1"
+    age = "info2"
+}
+
+this means that `name` will be written to the family `info1` and the `age` 
will be written to the family `info2`
+
+if you want other fields written to the same family, you can assign
+
+family_name {
+    all_columns = "info"
+}
+
+this means that all fields will be written to the family `info`
+
+### rowkey_delimiter [string]
+
+The delimiter of joining multi row keys, default `""`
+
+### version_column [string]
+
+The version column name, you can use it to assign timestamp for hbase record
+
+### null_mode [double]
+
+The mode of writing null value, support [`skip`, `empty`], default `skip`
+
+- skip: When the field is null, connector will not write this field to hbase
+- empty: When the field is null, connector will write generate empty value for 
this field
+
+### wal_write [boolean]
+
+The wal log write flag, default `false`
+
+### write_buffer_size [int]
+
+The write buffer size of hbase client, default `8 * 1024 * 1024`
+
+### encoding [string]
+
+The encoding of string field, support [`utf8`, `gbk`], default `utf8`
+
+### hbase_extra_config [config]
+
+The extra configuration of hbase
+
+### common options
+
+Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details
+
+## Example
+
+```hocon
+
+Hbase {
+  zookeeper_quorum = "hadoop001:2181,hadoop002:2181,hadoop003:2181"
+  table = "seatunnel_test"
+  rowkey_column = ["name"]
+  family_name {
+    all_columns = seatunnel
+  }
+}
+
+```
+
+## Changelog
+
+### next version
+
+- Add hbase sink connector 
([4049](https://github.com/apache/incubator-seatunnel/pull/4049))
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index a34e8f866..a4dd2f0aa 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -170,4 +170,5 @@ seatunnel.source.TDengine = connector-tdengine
 seatunnel.sink.TDengine = connector-tdengine
 seatunnel.source.Persistiq = connector-http-persistiq
 seatunnel.sink.SelectDBCloud = connector-selectdb-cloud
+seatunnel.sink.Hbase = connector-hbase
 
diff --git a/release-note.md b/release-note.md
index 43ac32276..c9c3eace9 100644
--- a/release-note.md
+++ b/release-note.md
@@ -12,6 +12,7 @@
 - [ALL]Add FieldMapper Transform #3781
 ### Connectors
 - [Elasticsearch] Support https protocol & compatible with opensearch
+- [Hbase] Add hbase sink connector #4049
 ### Formats
 - [Canal]Support read canal format message #3950
 
diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
index 7d215dc95..c91d2c67c 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
@@ -23,6 +23,7 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
 
 import lombok.NonNull;
 
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -108,4 +109,12 @@ public final class TypesafeConfigUtils {
                                                        @NonNull List<? extends 
Config> defaultValue) {
         return config.hasPath(configKey) ? config.getConfigList(configKey) : 
defaultValue;
     }
+
+    public static Map<String, String> configToMap(Config config) {
+        Map<String, String> configMap = new HashMap<>();
+        config.entrySet().forEach(entry -> {
+            configMap.put(entry.getKey(), 
entry.getValue().unwrapped().toString());
+        });
+        return configMap;
+    }
 }
diff --git a/seatunnel-connectors-v2/connector-hbase/pom.xml 
b/seatunnel-connectors-v2/connector-hbase/pom.xml
new file mode 100644
index 000000000..e292ac77d
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-hbase/pom.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connectors-v2</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-hbase</artifactId>
+
+    <properties>
+        <hbase.version>2.4.10</hbase.version>
+    </properties>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+            <version>${hbase.version}</version>
+        </dependency>
+
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
new file mode 100644
index 000000000..481591d56
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.util.List;
+import java.util.Map;
+
+public class HbaseConfig {
+
+    private static final Integer DEFAULT_BUFFER_SIZE = 8 * 1024 * 1024;
+
+    public static final Option<String> ZOOKEEPER_QUORUM = 
Options.key("zookeeper_quorum")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("Hbase zookeeper quorum");
+
+    public static final Option<String> TABLE = Options.key("table")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("Hbase table name");
+
+    public static final Option<List<String>> ROWKEY_COLUMNS = 
Options.key("rowkey_column")
+            .listType()
+            .noDefaultValue()
+            .withDescription("Hbase rowkey column");
+
+    public static final Option<String> ROWKEY_DELIMITER = 
Options.key("rowkey_delimiter")
+            .stringType()
+            .defaultValue("")
+            .withDescription("Hbase rowkey join delimiter");
+
+    public static final Option<String> VERSION_COLUMN = 
Options.key("version_column")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("Hbase record version column used for assigning 
timestamp of records");
+
+    public static final Option<NullMode> NULL_MODE = Options.key("null_mode")
+            .enumType(NullMode.class)
+            .defaultValue(NullMode.SKIP)
+            .withDescription("The processing mode for writing null values");
+
+    public static final Option<Boolean> WAL_WRITE = Options.key("wal_write")
+            .booleanType()
+            .defaultValue(false)
+            .withDescription("The flag of whether write wal log");
+
+    public static final Option<Integer> WRITE_BUFFER_SIZE = 
Options.key("write_buffer_size")
+            .intType()
+            .defaultValue(DEFAULT_BUFFER_SIZE)
+            .withDescription("Hbase client write buffer size");
+
+    public static final Option<EnCoding> ENCODING = Options.key("encoding")
+            .enumType(EnCoding.class)
+            .defaultValue(EnCoding.UTF8)
+            .withDescription("Hbase record encoding");
+
+    public static final Option<Map<String, String>> FAMILY_NAME = 
Options.key("family_name")
+            .mapType()
+            .noDefaultValue()
+            .withDescription("Hbase column family name");
+
+    public static final Option<Map<String, String>> HBASE_EXTRA_CONFIG = 
Options.key("hbase_extra_config")
+            .mapType()
+            .noDefaultValue()
+            .withDescription("Hbase extra config");
+
+    public enum NullMode {
+        SKIP,
+        EMPTY;
+    }
+
+    public enum EnCoding {
+        UTF8,
+        GBK;
+    }
+
+    private HbaseConfig() {}
+}
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
new file mode 100644
index 000000000..30459cbcf
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.config;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ENCODING;
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_EXTRA_CONFIG;
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.NULL_MODE;
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS;
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_DELIMITER;
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.TABLE;
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.VERSION_COLUMN;
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.WAL_WRITE;
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.WRITE_BUFFER_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ZOOKEEPER_QUORUM;
+
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Builder;
+import lombok.Getter;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@Builder
+@Getter
+public class HbaseParameters implements Serializable {
+
+    private String zookeeperQuorum;
+
+    private String table;
+
+    private List<String> rowkeyColumns;
+
+    private Map<String, String> familyNames;
+
+    private String versionColumn;
+
+    private Map<String, String> hbaseExtraConfig;
+
+    @Builder.Default
+    private String rowkeyDelimiter = ROWKEY_DELIMITER.defaultValue();
+
+    @Builder.Default
+    private HbaseConfig.NullMode nullMode = NULL_MODE.defaultValue();
+
+    @Builder.Default
+    private boolean walWrite = WAL_WRITE.defaultValue();
+
+    @Builder.Default
+    private int writeBufferSize = WRITE_BUFFER_SIZE.defaultValue();
+
+    @Builder.Default
+    private HbaseConfig.EnCoding enCoding = ENCODING.defaultValue();
+
+    public static HbaseParameters buildWithConfig(Config pluginConfig) {
+        HbaseParametersBuilder builder = HbaseParameters.builder();
+
+        // required parameters
+        
builder.zookeeperQuorum(pluginConfig.getString(ZOOKEEPER_QUORUM.key()));
+        builder.table(pluginConfig.getString(TABLE.key()));
+        
builder.rowkeyColumns(pluginConfig.getStringList(ROWKEY_COLUMNS.key()));
+        
builder.familyNames(TypesafeConfigUtils.configToMap(pluginConfig.getConfig(FAMILY_NAME.key())));
+
+        // optional parameters
+        if (pluginConfig.hasPath(ROWKEY_DELIMITER.key())) {
+            
builder.rowkeyDelimiter(pluginConfig.getString(ROWKEY_DELIMITER.key()));
+        }
+        if (pluginConfig.hasPath(VERSION_COLUMN.key())) {
+            
builder.versionColumn(pluginConfig.getString(VERSION_COLUMN.key()));
+        }
+        if (pluginConfig.hasPath(NULL_MODE.key())) {
+            String nullMode = pluginConfig.getString(NULL_MODE.key());
+            
builder.nullMode(HbaseConfig.NullMode.valueOf(nullMode.toUpperCase()));
+        }
+        if (pluginConfig.hasPath(WAL_WRITE.key())) {
+            builder.walWrite(pluginConfig.getBoolean(WAL_WRITE.key()));
+        }
+        if (pluginConfig.hasPath(WRITE_BUFFER_SIZE.key())) {
+            
builder.writeBufferSize(pluginConfig.getInt(WRITE_BUFFER_SIZE.key()));
+        }
+        if (pluginConfig.hasPath(ENCODING.key())) {
+            String encoding = pluginConfig.getString(ENCODING.key());
+            
builder.enCoding(HbaseConfig.EnCoding.valueOf(encoding.toUpperCase()));
+        }
+        if (pluginConfig.hasPath(HBASE_EXTRA_CONFIG.key())) {
+            Config extraConfig = 
pluginConfig.getConfig(HBASE_EXTRA_CONFIG.key());
+            
builder.hbaseExtraConfig(TypesafeConfigUtils.configToMap(extraConfig));
+        }
+        return builder.build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/exception/HbaseConnectorException.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/exception/HbaseConnectorException.java
new file mode 100644
index 000000000..e2ebc3616
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/exception/HbaseConnectorException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class HbaseConnectorException extends SeaTunnelRuntimeException {
+    public HbaseConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage) {
+        super(seaTunnelErrorCode, errorMessage);
+    }
+
+    public HbaseConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage, Throwable cause) {
+        super(seaTunnelErrorCode, errorMessage, cause);
+    }
+
+    public HbaseConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
Throwable cause) {
+        super(seaTunnelErrorCode, cause);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
new file mode 100644
index 000000000..338e10dbe
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.sink;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS;
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.TABLE;
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ZOOKEEPER_QUORUM;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import 
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+@AutoService(SeaTunnelSink.class)
+public class HbaseSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+
+    private Config pluginConfig;
+
+    private SeaTunnelRowType seaTunnelRowType;
+
+    private HbaseParameters hbaseParameters;
+
+    private List<Integer> rowkeyColumnIndexes = new ArrayList<>();
+
+    private int versionColumnIndex = -1;
+
+    @Override
+    public String getPluginName() {
+        return HbaseSinkFactory.IDENTIFIER;
+    }
+
+    @Override
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        this.pluginConfig = pluginConfig;
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
+                ZOOKEEPER_QUORUM.key(), TABLE.key(), ROWKEY_COLUMNS.key(), 
FAMILY_NAME.key());
+        if (!result.isSuccess()) {
+            throw new 
HbaseConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format("PluginName: %s, PluginType: %s, Message: 
%s",
+                            getPluginName(), PluginType.SINK, 
result.getMsg()));
+        }
+        this.hbaseParameters = HbaseParameters.buildWithConfig(pluginConfig);
+    }
+
+    @Override
+    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+        for (String rowkeyColumn : hbaseParameters.getRowkeyColumns()) {
+            
this.rowkeyColumnIndexes.add(seaTunnelRowType.indexOf(rowkeyColumn));
+        }
+        if (hbaseParameters.getVersionColumn() != null) {
+            this.versionColumnIndex = 
seaTunnelRowType.indexOf(hbaseParameters.getVersionColumn());
+        }
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+        return seaTunnelRowType;
+    }
+
+    @Override
+    public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context) throws IOException {
+        return new HbaseSinkWriter(seaTunnelRowType, hbaseParameters, 
rowkeyColumnIndexes, versionColumnIndex);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
new file mode 100644
index 000000000..f08e86276
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.sink;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ENCODING;
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_EXTRA_CONFIG;
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.NULL_MODE;
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS;
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_DELIMITER;
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.TABLE;
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.VERSION_COLUMN;
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.WAL_WRITE;
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.WRITE_BUFFER_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ZOOKEEPER_QUORUM;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class HbaseSinkFactory implements TableSinkFactory {
+
+    public static final String IDENTIFIER = "Hbase";
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(ZOOKEEPER_QUORUM, TABLE, ROWKEY_COLUMNS, FAMILY_NAME)
+                .optional(ROWKEY_DELIMITER,
+                        VERSION_COLUMN,
+                        NULL_MODE,
+                        WAL_WRITE,
+                        WRITE_BUFFER_SIZE,
+                        ENCODING,
+                        HBASE_EXTRA_CONFIG)
+                .build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
new file mode 100644
index 000000000..18a4f6dec
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import 
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.BufferedMutatorParams;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class HbaseSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+    private static final String ALL_COLUMNS = "all_columns";
+
+    private final Configuration hbaseConfiguration  = 
HBaseConfiguration.create();
+
+    private final Connection hbaseConnection;
+
+    private final BufferedMutator hbaseMutator;
+
+    private final SeaTunnelRowType seaTunnelRowType;
+
+    private final HbaseParameters hbaseParameters;
+
+    private final List<Integer> rowkeyColumnIndexes;
+
+    private final int versionColumnIndex;
+
+    private String defaultFamilyName = "value";
+
+    public HbaseSinkWriter(SeaTunnelRowType seaTunnelRowType,
+                           HbaseParameters hbaseParameters,
+                           List<Integer> rowkeyColumnIndexes,
+                           int versionColumnIndex) throws IOException {
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.hbaseParameters = hbaseParameters;
+        this.rowkeyColumnIndexes = rowkeyColumnIndexes;
+        this.versionColumnIndex = versionColumnIndex;
+
+        if (hbaseParameters.getFamilyNames().size() == 1) {
+            defaultFamilyName = 
hbaseParameters.getFamilyNames().getOrDefault(ALL_COLUMNS, "value");
+        }
+
+        // initialize hbase configuration
+        hbaseConfiguration.set("hbase.zookeeper.quorum", 
hbaseParameters.getZookeeperQuorum());
+        if (hbaseParameters.getHbaseExtraConfig() != null) {
+            
hbaseParameters.getHbaseExtraConfig().forEach(hbaseConfiguration::set);
+        }
+        // initialize hbase connection
+        hbaseConnection = 
ConnectionFactory.createConnection(hbaseConfiguration);
+        // initialize hbase mutator
+        BufferedMutatorParams bufferedMutatorParams = new 
BufferedMutatorParams(TableName.valueOf(hbaseParameters.getTable()))
+                .pool(HTable.getDefaultExecutor(hbaseConfiguration))
+                .writeBufferSize(hbaseParameters.getWriteBufferSize());
+        hbaseMutator = 
hbaseConnection.getBufferedMutator(bufferedMutatorParams);
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+        Put put = convertRowToPut(element);
+        hbaseMutator.mutate(put);
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (hbaseMutator != null) {
+            hbaseMutator.close();
+        }
+        if (hbaseConnection != null) {
+            hbaseConnection.close();
+        }
+    }
+
+    private Put convertRowToPut(SeaTunnelRow row) {
+        byte[] rowkey = getRowkeyFromRow(row);
+        long timestamp = HConstants.LATEST_TIMESTAMP;
+        if (versionColumnIndex != -1) {
+            timestamp = (Long) row.getField(versionColumnIndex);
+        }
+        Put put = new Put(rowkey, timestamp);
+        if (!hbaseParameters.isWalWrite()) {
+            put.setDurability(Durability.SKIP_WAL);
+        }
+        List<Integer> writeColumnIndexes = IntStream.range(0, 
row.getArity()).boxed()
+                .filter(index -> !rowkeyColumnIndexes.contains(index))
+                .filter(index -> index != versionColumnIndex)
+                .collect(Collectors.toList());
+        for (Integer writeColumnIndex : writeColumnIndexes) {
+            String fieldName = seaTunnelRowType.getFieldName(writeColumnIndex);
+            String familyName = 
hbaseParameters.getFamilyNames().getOrDefault(fieldName, defaultFamilyName);
+            byte[] bytes = convertColumnToBytes(row, writeColumnIndex);
+            if (bytes != null) {
+                put.addColumn(Bytes.toBytes(familyName), 
Bytes.toBytes(fieldName), bytes);
+            } else {
+                switch (hbaseParameters.getNullMode()) {
+                    case EMPTY:
+                        put.addColumn(Bytes.toBytes(familyName), 
Bytes.toBytes(fieldName), HConstants.EMPTY_BYTE_ARRAY);
+                        break;
+                    case SKIP:
+                    default:
+                        break;
+                }
+            }
+        }
+        return put;
+    }
+
+    private byte[] getRowkeyFromRow(SeaTunnelRow row) {
+        String[] rowkeyValues = new String[rowkeyColumnIndexes.size()];
+        for (int i = 0; i < rowkeyColumnIndexes.size(); i++) {
+            rowkeyValues[i] = 
row.getField(rowkeyColumnIndexes.get(i)).toString();
+        }
+        return Bytes.toBytes(String.join(hbaseParameters.getRowkeyDelimiter(), 
rowkeyValues));
+    }
+
+    private byte[] convertColumnToBytes(SeaTunnelRow row, int index) {
+        Object field = row.getField(index);
+        if (field == null) {
+            return null;
+        }
+        SeaTunnelDataType<?> fieldType = seaTunnelRowType.getFieldType(index);
+        switch (fieldType.getSqlType()) {
+            case TINYINT:
+                return Bytes.toBytes((Byte) field);
+            case SMALLINT:
+                return Bytes.toBytes((Short) field);
+            case INT:
+                return Bytes.toBytes((Integer) field);
+            case BIGINT:
+                return Bytes.toBytes((Long) field);
+            case FLOAT:
+                return Bytes.toBytes((Float) field);
+            case DOUBLE:
+                return Bytes.toBytes((Double) field);
+            case BOOLEAN:
+                return Bytes.toBytes((Boolean) field);
+            case STRING:
+                return field.toString()
+                        
.getBytes(Charset.forName(hbaseParameters.getEnCoding().toString()));
+            default:
+                String errorMsg = String.format("Hbase connector does not 
support this column type [%s]", fieldType.getSqlType());
+                throw new 
HbaseConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/HbaseFactoryTest.java
 
b/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/HbaseFactoryTest.java
new file mode 100644
index 000000000..2d9042f6d
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/HbaseFactoryTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase;
+
+import org.apache.seatunnel.connectors.seatunnel.hbase.sink.HbaseSinkFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class HbaseFactoryTest {
+
+    @Test
+    public void optionRuleTest() {
+        Assertions.assertNotNull((new HbaseSinkFactory()).optionRule());
+    }
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index 285e9602d..86c259c3b 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -70,6 +70,7 @@
         <module>connector-maxcompute</module>
         <module>connector-tdengine</module>
         <module>connector-selectdb-cloud</module>
+        <module>connector-hbase</module>
     </modules>
 
     <dependencyManagement>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index 3752bdc84..cdd21efb8 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -467,6 +467,12 @@
                     <version>${project.version}</version>
                     <scope>provided</scope>
                 </dependency>
+                <dependency>
+                    <groupId>org.apache.seatunnel</groupId>
+                    <artifactId>connector-hbase</artifactId>
+                    <version>${project.version}</version>
+                    <scope>provided</scope>
+                </dependency>
 
                 <!-- jdbc driver -->
                 <dependency>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/pom.xml
new file mode 100644
index 000000000..93bff9236
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/pom.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-hbase-e2e</artifactId>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-hbase</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
new file mode 100644
index 000000000..e91ecadf6
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.hbase;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Objects;
+import java.util.stream.Stream;
+
+@Slf4j
+@Disabled("Hbase docker e2e case need user add mapping information of between 
container id and ip address in hosts file")
+public class HbaseIT extends TestSuiteBase implements TestResource {
+
+    private static final String IMAGE = "harisekhon/hbase:latest";
+
+    private static final int PORT = 2181;
+
+    private static final String HOST = "hbase-e2e";
+
+    private static final String TABLE_NAME = "seatunnel_test";
+
+    private static final String FAMILY_NAME = "info";
+
+    private final Configuration hbaseConfiguration = 
HBaseConfiguration.create();
+
+    private Connection hbaseConnection;
+
+    private Admin admin;
+
+    private TableName table;
+
+    private GenericContainer<?> hbaseContainer;
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        hbaseContainer = new GenericContainer<>(DockerImageName.parse(IMAGE))
+                .withNetwork(NETWORK)
+                .withNetworkAliases(HOST)
+                .withExposedPorts(PORT)
+                .withLogConsumer(new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)))
+                .waitingFor(new 
HostPortWaitStrategy().withStartupTimeout(Duration.ofMinutes(2)));
+        Startables.deepStart(Stream.of(hbaseContainer)).join();
+        log.info("Hbase container started");
+        this.initialize();
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() throws Exception {
+        if (Objects.nonNull(admin)) {
+            admin.close();
+        }
+        if (Objects.nonNull(hbaseConnection)) {
+            hbaseConnection.close();
+        }
+        if (Objects.nonNull(hbaseContainer)) {
+            hbaseContainer.close();
+        }
+    }
+
+    private void initialize() throws IOException {
+        hbaseConfiguration.set("hbase.zookeeper.quorum", HOST + ":" + PORT);
+        hbaseConnection = 
ConnectionFactory.createConnection(hbaseConfiguration);
+        admin = hbaseConnection.getAdmin();
+        table = TableName.valueOf(TABLE_NAME);
+        ColumnFamilyDescriptor familyDescriptor = 
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_NAME.getBytes())
+                .setCompressionType(Compression.Algorithm.SNAPPY)
+                .setCompactionCompressionType(Compression.Algorithm.SNAPPY)
+                .build();
+        TableDescriptor tableDescriptor = 
TableDescriptorBuilder.newBuilder(table)
+                .setColumnFamily(familyDescriptor)
+                .build();
+        admin.createTable(tableDescriptor);
+        log.info("Hbase table has been initialized");
+    }
+
+    @TestTemplate
+    public void testHbaseSink(TestContainer container) throws IOException, 
InterruptedException {
+        Container.ExecResult execResult = 
container.executeJob("/fake-to-hbase.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Table hbaseTable = hbaseConnection.getTable(table);
+        Scan scan = new Scan();
+        ArrayList<Result> results = new ArrayList<>();
+        ResultScanner scanner = hbaseTable.getScanner(scan);
+        for (Result result : scanner) {
+            results.add(result);
+        }
+        Assertions.assertEquals(results.size(), 5);
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase.conf
new file mode 100644
index 000000000..04e250756
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase.conf
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+}
+
+source {
+    FakeSource {
+        row.num = 5
+        schema {
+            fields {
+                name = string
+                age = int
+                c_tinyint = tinyint
+                c_smallint = smallint
+                c_bigint = bigint
+                c_float = float
+                c_double = double
+                c_boolean = boolean
+            }
+        }
+    }
+}
+
+sink {
+    Hbase {
+        zookeeper_quorum = "hbase-e2e:2181"
+        table = "seatunnel_test"
+        rowkey_column = ["name"]
+        family_name {
+            all_columns = info
+        }
+    }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 118fe511a..6c1de4fa7 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -49,6 +49,7 @@
         <module>connector-datahub-e2e</module>
         <module>connector-mongodb-e2e</module>
         <module>connector-selectdb-cloud-e2e</module>
+        <module>connector-hbase-e2e</module>
     </modules>
 
     <artifactId>seatunnel-connector-v2-e2e</artifactId>

Reply via email to