This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 68bda94a4 [Feature][Connector-V2][Hbase] Introduce hbase sink
connector (#4049)
68bda94a4 is described below
commit 68bda94a4ce1839404ddb098a29eddef1df3758d
Author: Tyrantlucifer <[email protected]>
AuthorDate: Wed Feb 8 15:51:17 2023 +0800
[Feature][Connector-V2][Hbase] Introduce hbase sink connector (#4049)
* [Feature][Connector-V2][Hbase] Introduce hbase sink connector
* [Feature][Connector-V2][Hbase] Introduce hbase sink connector
* [Feature][Connector-V2][Hbase] Add scope for connector-hbase
* [Feature][Connector-V2][Hbase] Add docs
* [Feature][Connector-V2][Hbase] Add e2e test case
* [Feature][Connector-V2][Hbase] Update release-note
* [Feature][Connector-V2][Hbase] Remove useless parameter
* [Feature][Connector-V2][Hbase] Fix code style
* [Feature][Connector-V2][Hbase] Test all data types
* [Feature][Connector-V2][Hbase] Optimize e2e test case
---
config/plugin_config | 2 +
docs/en/connector-v2/sink/Hbase.md | 121 ++++++++++++++
plugin-mapping.properties | 1 +
release-note.md | 1 +
.../common/config/TypesafeConfigUtils.java | 9 +
seatunnel-connectors-v2/connector-hbase/pom.xml | 52 ++++++
.../seatunnel/hbase/config/HbaseConfig.java | 96 +++++++++++
.../seatunnel/hbase/config/HbaseParameters.java | 110 +++++++++++++
.../hbase/exception/HbaseConnectorException.java | 35 ++++
.../connectors/seatunnel/hbase/sink/HbaseSink.java | 99 +++++++++++
.../seatunnel/hbase/sink/HbaseSinkFactory.java | 61 +++++++
.../seatunnel/hbase/sink/HbaseSinkWriter.java | 181 +++++++++++++++++++++
.../seatunnel/hbase/HbaseFactoryTest.java | 31 ++++
seatunnel-connectors-v2/pom.xml | 1 +
seatunnel-dist/pom.xml | 6 +
.../connector-hbase-e2e/pom.xml | 39 +++++
.../seatunnel/e2e/connector/hbase/HbaseIT.java | 140 ++++++++++++++++
.../src/test/resources/fake-to-hbase.conf | 50 ++++++
seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 1 +
19 files changed, 1036 insertions(+)
diff --git a/config/plugin_config b/config/plugin_config
index b3548ab64..77531f5e6 100644
--- a/config/plugin_config
+++ b/config/plugin_config
@@ -70,4 +70,6 @@ connector-slack
connector-socket
connector-starrocks
connector-tablestore
+connector-selectdb-cloud
+connector-hbase
--end--
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/Hbase.md
b/docs/en/connector-v2/sink/Hbase.md
new file mode 100644
index 000000000..a470c6090
--- /dev/null
+++ b/docs/en/connector-v2/sink/Hbase.md
@@ -0,0 +1,121 @@
+# Hbase
+
+> Hbase sink connector
+
+## Description
+
+Output data to Hbase
+
+## Key features
+
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+
+## Options
+
+| name | type | required | default value |
+|--------------------|---------|----------|-----------------|
+| zookeeper_quorum | string | yes | - |
+| table | string | yes | - |
+| rowkey_column | list | yes | - |
+| family_name | config | yes | - |
+| rowkey_delimiter | string | no | "" |
+| version_column | string | no | - |
+| null_mode | string | no | skip |
+| wal_write | boolean | yes | false |
+| write_buffer_size | string | no | 8 * 1024 * 1024 |
+| encoding | string | no | utf8 |
+| hbase_extra_config | string | no | - |
+| common-options | | no | - |
+
+### zookeeper_quorum [string]
+
+The zookeeper cluster host of hbase, example:
"hadoop001:2181,hadoop002:2181,hadoop003:2181"
+
+### table [string]
+
+The table name you want to write, example: "seatunnel"
+
+### rowkey_column [list]
+
+The column name list of row keys, example: ["id", "uuid"]
+
+### family_name [config]
+
+The family name mapping of fields. For example the row from upstream like the
following shown:
+
+| id | name | age |
+|-----|---------------|-----|
+| 1 | tyrantlucifer | 27 |
+
+id as the row key and other fields written to the different families, you can
assign
+
+family_name {
+ name = "info1"
+ age = "info2"
+}
+
+this means that `name` will be written to the family `info1` and the `age`
will be written to the family `info2`
+
+if you want other fields written to the same family, you can assign
+
+family_name {
+ all_columns = "info"
+}
+
+this means that all fields will be written to the family `info`
+
+### rowkey_delimiter [string]
+
+The delimiter of joining multi row keys, default `""`
+
+### version_column [string]
+
+The version column name, you can use it to assign timestamp for hbase record
+
+### null_mode [double]
+
+The mode of writing null value, support [`skip`, `empty`], default `skip`
+
+- skip: When the field is null, connector will not write this field to hbase
+- empty: When the field is null, connector will write generate empty value for
this field
+
+### wal_write [boolean]
+
+The wal log write flag, default `false`
+
+### write_buffer_size [int]
+
+The write buffer size of hbase client, default `8 * 1024 * 1024`
+
+### encoding [string]
+
+The encoding of string field, support [`utf8`, `gbk`], default `utf8`
+
+### hbase_extra_config [config]
+
+The extra configuration of hbase
+
+### common options
+
+Sink plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details
+
+## Example
+
+```hocon
+
+Hbase {
+ zookeeper_quorum = "hadoop001:2181,hadoop002:2181,hadoop003:2181"
+ table = "seatunnel_test"
+ rowkey_column = ["name"]
+ family_name {
+ all_columns = seatunnel
+ }
+}
+
+```
+
+## Changelog
+
+### next version
+
+- Add hbase sink connector
([4049](https://github.com/apache/incubator-seatunnel/pull/4049))
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index a34e8f866..a4dd2f0aa 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -170,4 +170,5 @@ seatunnel.source.TDengine = connector-tdengine
seatunnel.sink.TDengine = connector-tdengine
seatunnel.source.Persistiq = connector-http-persistiq
seatunnel.sink.SelectDBCloud = connector-selectdb-cloud
+seatunnel.sink.Hbase = connector-hbase
diff --git a/release-note.md b/release-note.md
index 43ac32276..c9c3eace9 100644
--- a/release-note.md
+++ b/release-note.md
@@ -12,6 +12,7 @@
- [ALL]Add FieldMapper Transform #3781
### Connectors
- [Elasticsearch] Support https protocol & compatible with opensearch
+- [Hbase] Add hbase sink connector #4049
### Formats
- [Canal]Support read canal format message #3950
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
index 7d215dc95..c91d2c67c 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
@@ -23,6 +23,7 @@ import
org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
import lombok.NonNull;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -108,4 +109,12 @@ public final class TypesafeConfigUtils {
@NonNull List<? extends
Config> defaultValue) {
return config.hasPath(configKey) ? config.getConfigList(configKey) :
defaultValue;
}
+
+ public static Map<String, String> configToMap(Config config) {
+ Map<String, String> configMap = new HashMap<>();
+ config.entrySet().forEach(entry -> {
+ configMap.put(entry.getKey(),
entry.getValue().unwrapped().toString());
+ });
+ return configMap;
+ }
}
diff --git a/seatunnel-connectors-v2/connector-hbase/pom.xml
b/seatunnel-connectors-v2/connector-hbase/pom.xml
new file mode 100644
index 000000000..e292ac77d
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-hbase/pom.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connectors-v2</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>connector-hbase</artifactId>
+
+ <properties>
+ <hbase.version>2.4.10</hbase.version>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${hbase.version}</version>
+ </dependency>
+
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
new file mode 100644
index 000000000..481591d56
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.util.List;
+import java.util.Map;
+
+public class HbaseConfig {
+
+ private static final Integer DEFAULT_BUFFER_SIZE = 8 * 1024 * 1024;
+
+ public static final Option<String> ZOOKEEPER_QUORUM =
Options.key("zookeeper_quorum")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Hbase zookeeper quorum");
+
+ public static final Option<String> TABLE = Options.key("table")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Hbase table name");
+
+ public static final Option<List<String>> ROWKEY_COLUMNS =
Options.key("rowkey_column")
+ .listType()
+ .noDefaultValue()
+ .withDescription("Hbase rowkey column");
+
+ public static final Option<String> ROWKEY_DELIMITER =
Options.key("rowkey_delimiter")
+ .stringType()
+ .defaultValue("")
+ .withDescription("Hbase rowkey join delimiter");
+
+ public static final Option<String> VERSION_COLUMN =
Options.key("version_column")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Hbase record version column used for assigning
timestamp of records");
+
+ public static final Option<NullMode> NULL_MODE = Options.key("null_mode")
+ .enumType(NullMode.class)
+ .defaultValue(NullMode.SKIP)
+ .withDescription("The processing mode for writing null values");
+
+ public static final Option<Boolean> WAL_WRITE = Options.key("wal_write")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("The flag of whether write wal log");
+
+ public static final Option<Integer> WRITE_BUFFER_SIZE =
Options.key("write_buffer_size")
+ .intType()
+ .defaultValue(DEFAULT_BUFFER_SIZE)
+ .withDescription("Hbase client write buffer size");
+
+ public static final Option<EnCoding> ENCODING = Options.key("encoding")
+ .enumType(EnCoding.class)
+ .defaultValue(EnCoding.UTF8)
+ .withDescription("Hbase record encoding");
+
+ public static final Option<Map<String, String>> FAMILY_NAME =
Options.key("family_name")
+ .mapType()
+ .noDefaultValue()
+ .withDescription("Hbase column family name");
+
+ public static final Option<Map<String, String>> HBASE_EXTRA_CONFIG =
Options.key("hbase_extra_config")
+ .mapType()
+ .noDefaultValue()
+ .withDescription("Hbase extra config");
+
+ public enum NullMode {
+ SKIP,
+ EMPTY;
+ }
+
+ public enum EnCoding {
+ UTF8,
+ GBK;
+ }
+
+ private HbaseConfig() {}
+}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
new file mode 100644
index 000000000..30459cbcf
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.config;
+
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ENCODING;
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME;
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_EXTRA_CONFIG;
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.NULL_MODE;
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS;
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_DELIMITER;
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.TABLE;
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.VERSION_COLUMN;
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.WAL_WRITE;
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.WRITE_BUFFER_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ZOOKEEPER_QUORUM;
+
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Builder;
+import lombok.Getter;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@Builder
+@Getter
+public class HbaseParameters implements Serializable {
+
+ private String zookeeperQuorum;
+
+ private String table;
+
+ private List<String> rowkeyColumns;
+
+ private Map<String, String> familyNames;
+
+ private String versionColumn;
+
+ private Map<String, String> hbaseExtraConfig;
+
+ @Builder.Default
+ private String rowkeyDelimiter = ROWKEY_DELIMITER.defaultValue();
+
+ @Builder.Default
+ private HbaseConfig.NullMode nullMode = NULL_MODE.defaultValue();
+
+ @Builder.Default
+ private boolean walWrite = WAL_WRITE.defaultValue();
+
+ @Builder.Default
+ private int writeBufferSize = WRITE_BUFFER_SIZE.defaultValue();
+
+ @Builder.Default
+ private HbaseConfig.EnCoding enCoding = ENCODING.defaultValue();
+
+ public static HbaseParameters buildWithConfig(Config pluginConfig) {
+ HbaseParametersBuilder builder = HbaseParameters.builder();
+
+ // required parameters
+
builder.zookeeperQuorum(pluginConfig.getString(ZOOKEEPER_QUORUM.key()));
+ builder.table(pluginConfig.getString(TABLE.key()));
+
builder.rowkeyColumns(pluginConfig.getStringList(ROWKEY_COLUMNS.key()));
+
builder.familyNames(TypesafeConfigUtils.configToMap(pluginConfig.getConfig(FAMILY_NAME.key())));
+
+ // optional parameters
+ if (pluginConfig.hasPath(ROWKEY_DELIMITER.key())) {
+
builder.rowkeyDelimiter(pluginConfig.getString(ROWKEY_DELIMITER.key()));
+ }
+ if (pluginConfig.hasPath(VERSION_COLUMN.key())) {
+
builder.versionColumn(pluginConfig.getString(VERSION_COLUMN.key()));
+ }
+ if (pluginConfig.hasPath(NULL_MODE.key())) {
+ String nullMode = pluginConfig.getString(NULL_MODE.key());
+
builder.nullMode(HbaseConfig.NullMode.valueOf(nullMode.toUpperCase()));
+ }
+ if (pluginConfig.hasPath(WAL_WRITE.key())) {
+ builder.walWrite(pluginConfig.getBoolean(WAL_WRITE.key()));
+ }
+ if (pluginConfig.hasPath(WRITE_BUFFER_SIZE.key())) {
+
builder.writeBufferSize(pluginConfig.getInt(WRITE_BUFFER_SIZE.key()));
+ }
+ if (pluginConfig.hasPath(ENCODING.key())) {
+ String encoding = pluginConfig.getString(ENCODING.key());
+
builder.enCoding(HbaseConfig.EnCoding.valueOf(encoding.toUpperCase()));
+ }
+ if (pluginConfig.hasPath(HBASE_EXTRA_CONFIG.key())) {
+ Config extraConfig =
pluginConfig.getConfig(HBASE_EXTRA_CONFIG.key());
+
builder.hbaseExtraConfig(TypesafeConfigUtils.configToMap(extraConfig));
+ }
+ return builder.build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/exception/HbaseConnectorException.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/exception/HbaseConnectorException.java
new file mode 100644
index 000000000..e2ebc3616
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/exception/HbaseConnectorException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class HbaseConnectorException extends SeaTunnelRuntimeException {
+ public HbaseConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
String errorMessage) {
+ super(seaTunnelErrorCode, errorMessage);
+ }
+
+ public HbaseConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
String errorMessage, Throwable cause) {
+ super(seaTunnelErrorCode, errorMessage, cause);
+ }
+
+ public HbaseConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
Throwable cause) {
+ super(seaTunnelErrorCode, cause);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
new file mode 100644
index 000000000..338e10dbe
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.sink;
+
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME;
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS;
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.TABLE;
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ZOOKEEPER_QUORUM;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+@AutoService(SeaTunnelSink.class)
+public class HbaseSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+
+ private Config pluginConfig;
+
+ private SeaTunnelRowType seaTunnelRowType;
+
+ private HbaseParameters hbaseParameters;
+
+ private List<Integer> rowkeyColumnIndexes = new ArrayList<>();
+
+ private int versionColumnIndex = -1;
+
+ @Override
+ public String getPluginName() {
+ return HbaseSinkFactory.IDENTIFIER;
+ }
+
+ @Override
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ this.pluginConfig = pluginConfig;
+ CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
+ ZOOKEEPER_QUORUM.key(), TABLE.key(), ROWKEY_COLUMNS.key(),
FAMILY_NAME.key());
+ if (!result.isSuccess()) {
+ throw new
HbaseConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format("PluginName: %s, PluginType: %s, Message:
%s",
+ getPluginName(), PluginType.SINK,
result.getMsg()));
+ }
+ this.hbaseParameters = HbaseParameters.buildWithConfig(pluginConfig);
+ }
+
+ @Override
+ public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ for (String rowkeyColumn : hbaseParameters.getRowkeyColumns()) {
+
this.rowkeyColumnIndexes.add(seaTunnelRowType.indexOf(rowkeyColumn));
+ }
+ if (hbaseParameters.getVersionColumn() != null) {
+ this.versionColumnIndex =
seaTunnelRowType.indexOf(hbaseParameters.getVersionColumn());
+ }
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+ return seaTunnelRowType;
+ }
+
+ @Override
+ public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context) throws IOException {
+ return new HbaseSinkWriter(seaTunnelRowType, hbaseParameters,
rowkeyColumnIndexes, versionColumnIndex);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
new file mode 100644
index 000000000..f08e86276
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.sink;
+
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ENCODING;
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME;
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_EXTRA_CONFIG;
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.NULL_MODE;
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS;
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_DELIMITER;
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.TABLE;
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.VERSION_COLUMN;
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.WAL_WRITE;
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.WRITE_BUFFER_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ZOOKEEPER_QUORUM;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class HbaseSinkFactory implements TableSinkFactory {
+
+ public static final String IDENTIFIER = "Hbase";
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(ZOOKEEPER_QUORUM, TABLE, ROWKEY_COLUMNS, FAMILY_NAME)
+ .optional(ROWKEY_DELIMITER,
+ VERSION_COLUMN,
+ NULL_MODE,
+ WAL_WRITE,
+ WRITE_BUFFER_SIZE,
+ ENCODING,
+ HBASE_EXTRA_CONFIG)
+ .build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
new file mode 100644
index 000000000..18a4f6dec
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.BufferedMutatorParams;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class HbaseSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+ private static final String ALL_COLUMNS = "all_columns";
+
+ private final Configuration hbaseConfiguration =
HBaseConfiguration.create();
+
+ private final Connection hbaseConnection;
+
+ private final BufferedMutator hbaseMutator;
+
+ private final SeaTunnelRowType seaTunnelRowType;
+
+ private final HbaseParameters hbaseParameters;
+
+ private final List<Integer> rowkeyColumnIndexes;
+
+ private final int versionColumnIndex;
+
+ private String defaultFamilyName = "value";
+
+ public HbaseSinkWriter(SeaTunnelRowType seaTunnelRowType,
+ HbaseParameters hbaseParameters,
+ List<Integer> rowkeyColumnIndexes,
+ int versionColumnIndex) throws IOException {
+ this.seaTunnelRowType = seaTunnelRowType;
+ this.hbaseParameters = hbaseParameters;
+ this.rowkeyColumnIndexes = rowkeyColumnIndexes;
+ this.versionColumnIndex = versionColumnIndex;
+
+ if (hbaseParameters.getFamilyNames().size() == 1) {
+ defaultFamilyName =
hbaseParameters.getFamilyNames().getOrDefault(ALL_COLUMNS, "value");
+ }
+
+ // initialize hbase configuration
+ hbaseConfiguration.set("hbase.zookeeper.quorum",
hbaseParameters.getZookeeperQuorum());
+ if (hbaseParameters.getHbaseExtraConfig() != null) {
+
hbaseParameters.getHbaseExtraConfig().forEach(hbaseConfiguration::set);
+ }
+ // initialize hbase connection
+ hbaseConnection =
ConnectionFactory.createConnection(hbaseConfiguration);
+ // initialize hbase mutator
+ BufferedMutatorParams bufferedMutatorParams = new
BufferedMutatorParams(TableName.valueOf(hbaseParameters.getTable()))
+ .pool(HTable.getDefaultExecutor(hbaseConfiguration))
+ .writeBufferSize(hbaseParameters.getWriteBufferSize());
+ hbaseMutator =
hbaseConnection.getBufferedMutator(bufferedMutatorParams);
+ }
+
+ @Override
+ public void write(SeaTunnelRow element) throws IOException {
+ Put put = convertRowToPut(element);
+ hbaseMutator.mutate(put);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (hbaseMutator != null) {
+ hbaseMutator.close();
+ }
+ if (hbaseConnection != null) {
+ hbaseConnection.close();
+ }
+ }
+
+ private Put convertRowToPut(SeaTunnelRow row) {
+ byte[] rowkey = getRowkeyFromRow(row);
+ long timestamp = HConstants.LATEST_TIMESTAMP;
+ if (versionColumnIndex != -1) {
+ timestamp = (Long) row.getField(versionColumnIndex);
+ }
+ Put put = new Put(rowkey, timestamp);
+ if (!hbaseParameters.isWalWrite()) {
+ put.setDurability(Durability.SKIP_WAL);
+ }
+ List<Integer> writeColumnIndexes = IntStream.range(0,
row.getArity()).boxed()
+ .filter(index -> !rowkeyColumnIndexes.contains(index))
+ .filter(index -> index != versionColumnIndex)
+ .collect(Collectors.toList());
+ for (Integer writeColumnIndex : writeColumnIndexes) {
+ String fieldName = seaTunnelRowType.getFieldName(writeColumnIndex);
+ String familyName =
hbaseParameters.getFamilyNames().getOrDefault(fieldName, defaultFamilyName);
+ byte[] bytes = convertColumnToBytes(row, writeColumnIndex);
+ if (bytes != null) {
+ put.addColumn(Bytes.toBytes(familyName),
Bytes.toBytes(fieldName), bytes);
+ } else {
+ switch (hbaseParameters.getNullMode()) {
+ case EMPTY:
+ put.addColumn(Bytes.toBytes(familyName),
Bytes.toBytes(fieldName), HConstants.EMPTY_BYTE_ARRAY);
+ break;
+ case SKIP:
+ default:
+ break;
+ }
+ }
+ }
+ return put;
+ }
+
+ private byte[] getRowkeyFromRow(SeaTunnelRow row) {
+ String[] rowkeyValues = new String[rowkeyColumnIndexes.size()];
+ for (int i = 0; i < rowkeyColumnIndexes.size(); i++) {
+ rowkeyValues[i] =
row.getField(rowkeyColumnIndexes.get(i)).toString();
+ }
+ return Bytes.toBytes(String.join(hbaseParameters.getRowkeyDelimiter(),
rowkeyValues));
+ }
+
+ private byte[] convertColumnToBytes(SeaTunnelRow row, int index) {
+ Object field = row.getField(index);
+ if (field == null) {
+ return null;
+ }
+ SeaTunnelDataType<?> fieldType = seaTunnelRowType.getFieldType(index);
+ switch (fieldType.getSqlType()) {
+ case TINYINT:
+ return Bytes.toBytes((Byte) field);
+ case SMALLINT:
+ return Bytes.toBytes((Short) field);
+ case INT:
+ return Bytes.toBytes((Integer) field);
+ case BIGINT:
+ return Bytes.toBytes((Long) field);
+ case FLOAT:
+ return Bytes.toBytes((Float) field);
+ case DOUBLE:
+ return Bytes.toBytes((Double) field);
+ case BOOLEAN:
+ return Bytes.toBytes((Boolean) field);
+ case STRING:
+ return field.toString()
+
.getBytes(Charset.forName(hbaseParameters.getEnCoding().toString()));
+ default:
+ String errorMsg = String.format("Hbase connector does not
support this column type [%s]", fieldType.getSqlType());
+ throw new
HbaseConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/HbaseFactoryTest.java
b/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/HbaseFactoryTest.java
new file mode 100644
index 000000000..2d9042f6d
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/HbaseFactoryTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase;
+
+import org.apache.seatunnel.connectors.seatunnel.hbase.sink.HbaseSinkFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class HbaseFactoryTest {
+
+ @Test
+ public void optionRuleTest() {
+ Assertions.assertNotNull((new HbaseSinkFactory()).optionRule());
+ }
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index 285e9602d..86c259c3b 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -70,6 +70,7 @@
<module>connector-maxcompute</module>
<module>connector-tdengine</module>
<module>connector-selectdb-cloud</module>
+ <module>connector-hbase</module>
</modules>
<dependencyManagement>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index 3752bdc84..cdd21efb8 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -467,6 +467,12 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-hbase</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
<!-- jdbc driver -->
<dependency>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/pom.xml
new file mode 100644
index 000000000..93bff9236
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/pom.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connector-v2-e2e</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>connector-hbase-e2e</artifactId>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-hbase</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
new file mode 100644
index 000000000..e91ecadf6
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.hbase;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Objects;
+import java.util.stream.Stream;
+
+@Slf4j
+@Disabled("Hbase docker e2e case need user add mapping information of between
container id and ip address in hosts file")
+public class HbaseIT extends TestSuiteBase implements TestResource {
+
+ private static final String IMAGE = "harisekhon/hbase:latest";
+
+ private static final int PORT = 2181;
+
+ private static final String HOST = "hbase-e2e";
+
+ private static final String TABLE_NAME = "seatunnel_test";
+
+ private static final String FAMILY_NAME = "info";
+
+ private final Configuration hbaseConfiguration =
HBaseConfiguration.create();
+
+ private Connection hbaseConnection;
+
+ private Admin admin;
+
+ private TableName table;
+
+ private GenericContainer<?> hbaseContainer;
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ hbaseContainer = new GenericContainer<>(DockerImageName.parse(IMAGE))
+ .withNetwork(NETWORK)
+ .withNetworkAliases(HOST)
+ .withExposedPorts(PORT)
+ .withLogConsumer(new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)))
+ .waitingFor(new
HostPortWaitStrategy().withStartupTimeout(Duration.ofMinutes(2)));
+ Startables.deepStart(Stream.of(hbaseContainer)).join();
+ log.info("Hbase container started");
+ this.initialize();
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() throws Exception {
+ if (Objects.nonNull(admin)) {
+ admin.close();
+ }
+ if (Objects.nonNull(hbaseConnection)) {
+ hbaseConnection.close();
+ }
+ if (Objects.nonNull(hbaseContainer)) {
+ hbaseContainer.close();
+ }
+ }
+
+ private void initialize() throws IOException {
+ hbaseConfiguration.set("hbase.zookeeper.quorum", HOST + ":" + PORT);
+ hbaseConnection =
ConnectionFactory.createConnection(hbaseConfiguration);
+ admin = hbaseConnection.getAdmin();
+ table = TableName.valueOf(TABLE_NAME);
+ ColumnFamilyDescriptor familyDescriptor =
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_NAME.getBytes())
+ .setCompressionType(Compression.Algorithm.SNAPPY)
+ .setCompactionCompressionType(Compression.Algorithm.SNAPPY)
+ .build();
+ TableDescriptor tableDescriptor =
TableDescriptorBuilder.newBuilder(table)
+ .setColumnFamily(familyDescriptor)
+ .build();
+ admin.createTable(tableDescriptor);
+ log.info("Hbase table has been initialized");
+ }
+
+ @TestTemplate
+ public void testHbaseSink(TestContainer container) throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/fake-to-hbase.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Table hbaseTable = hbaseConnection.getTable(table);
+ Scan scan = new Scan();
+ ArrayList<Result> results = new ArrayList<>();
+ ResultScanner scanner = hbaseTable.getScanner(scan);
+ for (Result result : scanner) {
+ results.add(result);
+ }
+ Assertions.assertEquals(results.size(), 5);
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase.conf
new file mode 100644
index 000000000..04e250756
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase.conf
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ row.num = 5
+ schema {
+ fields {
+ name = string
+ age = int
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_boolean = boolean
+ }
+ }
+ }
+}
+
+sink {
+ Hbase {
+ zookeeper_quorum = "hbase-e2e:2181"
+ table = "seatunnel_test"
+ rowkey_column = ["name"]
+ family_name {
+ all_columns = info
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 118fe511a..6c1de4fa7 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -49,6 +49,7 @@
<module>connector-datahub-e2e</module>
<module>connector-mongodb-e2e</module>
<module>connector-selectdb-cloud-e2e</module>
+ <module>connector-hbase-e2e</module>
</modules>
<artifactId>seatunnel-connector-v2-e2e</artifactId>