This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f108a5e658 [Connector]Add hbase source connector (#6348)
f108a5e658 is described below
commit f108a5e658b261aa049e688264dcc50787611761
Author: TaoZex <[email protected]>
AuthorDate: Wed May 15 15:48:08 2024 +0800
[Connector]Add hbase source connector (#6348)
---
.../connector-v2/Error-Quick-Reference-Manual.md | 6 +
docs/en/connector-v2/source/Hbase.md | 91 ++++++++++
plugin-mapping.properties | 1 +
.../seatunnel/hbase/config/HbaseConfig.java | 6 +
.../seatunnel/hbase/config/HbaseParameters.java | 18 ++
.../hbase/exception/HbaseConnectorErrorCode.java | 43 +++++
.../hbase/format/HBaseDeserializationFormat.java | 93 ++++++++++
.../seatunnel/hbase/source/HbaseSource.java | 114 ++++++++++++
.../seatunnel/hbase/source/HbaseSourceFactory.java | 64 +++++++
.../seatunnel/hbase/source/HbaseSourceReader.java | 195 +++++++++++++++++++++
.../seatunnel/hbase/source/HbaseSourceSplit.java | 56 ++++++
.../hbase/source/HbaseSourceSplitEnumerator.java | 185 +++++++++++++++++++
.../seatunnel/hbase/source/HbaseSourceState.java | 36 ++++
.../seatunnel/hbase/utils/HbaseConnectionUtil.java | 48 +++++
.../seatunnel/e2e/connector/hbase/HbaseIT.java | 6 +
.../src/test/resources/hbase-to-assert.conf | 119 +++++++++++++
16 files changed, 1081 insertions(+)
diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index 2519bfbe65..960bddc0eb 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -256,6 +256,12 @@ problems encountered by users.
|--------------|-------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------|
| FIRESTORE-01 | Close Firestore client failed | When users encounter this
error code, it is usually there are some problems with closing the Firestore
client, please check the Firestore is work |
+## Hbase Connector Error Codes
+
+| code | description |
solution
|
+|----------|-------------------------------|---------------------------------------------------------------------------------------------------------------------------------|
+| Hbase-01 | Build hbase connection failed | When users create Hbase database
connection, the connection failed. Check the Hbase configuration parameters
used and try again |
+
## FilterFieldTransform Error Codes
| code | description | solution
|
diff --git a/docs/en/connector-v2/source/Hbase.md
b/docs/en/connector-v2/source/Hbase.md
new file mode 100644
index 0000000000..677b827fb2
--- /dev/null
+++ b/docs/en/connector-v2/source/Hbase.md
@@ -0,0 +1,91 @@
+# Hbase
+
+> Hbase source connector
+
+## Description
+
+Read data from Apache Hbase.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [schema projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name | type | required | default value |
+|--------------------|--------|----------|---------------|
+| zookeeper_quorum | string | yes | - |
+| table | string | yes | - |
+| query_columns | list | yes | - |
+| schema | config | yes | - |
+| hbase_extra_config | string | no | - |
+| common-options | | no | - |
+
+### zookeeper_quorum [string]
+
+The zookeeper cluster host of hbase, example:
"hadoop001:2181,hadoop002:2181,hadoop003:2181"
+
+### table [string]
+
+The table name you want to write, example: "seatunnel"
+
+### query_columns [list]
+
+The column name which you want to query in the table. If you want to query the
rowkey column, please set "rowkey" in query_columns.
+Other column format should be: columnFamily:columnName, example: ["rowkey",
"columnFamily1:column1", "columnFamily1:column1", "columnFamily2:column1"]
+
+### schema [config]
+
+Hbase uses byte arrays for storage. Therefore, you need to configure data
types for each column in a table. For more information, see:
[guide](../../concept/schema-feature.md#how-to-declare-type-supported).
+
+### hbase_extra_config [config]
+
+The extra configuration of hbase
+
+### common options
+
+Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details
+
+## Examples
+
+```bash
+source {
+ Hbase {
+ zookeeper_quorum = "hadoop001:2181,hadoop002:2181,hadoop003:2181"
+ table = "seatunnel_test"
+ query_columns=["rowkey", "columnFamily1:column1",
"columnFamily1:column1", "columnFamily2:column1"]
+ schema = {
+ columns = [
+ {
+ name = rowkey
+ type = string
+ },
+ {
+ name = "columnFamily1:column1"
+ type = boolean
+ },
+ {
+ name = "columnFamily1:column1"
+ type = double
+ },
+ {
+ name = "columnFamily2:column1"
+ type = bigint
+ }
+ ]
+ }
+ }
+}
+```
+
+## Changelog
+
+### next version
+
+- Add Hbase Source Connector
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index c880a8fdf2..314d453ffc 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -110,6 +110,7 @@ seatunnel.source.TDengine = connector-tdengine
seatunnel.sink.TDengine = connector-tdengine
seatunnel.source.Persistiq = connector-http-persistiq
seatunnel.sink.SelectDBCloud = connector-selectdb-cloud
+seatunnel.source.Hbase = connector-hbase
seatunnel.sink.Hbase = connector-hbase
seatunnel.source.StarRocks = connector-starrocks
seatunnel.source.Rocketmq = connector-rocketmq
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
index 605a5291fc..565f1b4b48 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
@@ -42,6 +42,12 @@ public class HbaseConfig {
.noDefaultValue()
.withDescription("Hbase rowkey column");
+ public static final Option<List<String>> QUERY_COLUMNS =
+ Options.key("query_columns")
+ .listType()
+ .noDefaultValue()
+ .withDescription("query Hbase columns");
+
public static final Option<String> ROWKEY_DELIMITER =
Options.key("rowkey_delimiter")
.stringType()
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
index fe32301f6b..858030fe2a 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
@@ -32,6 +32,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig
import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME;
import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_EXTRA_CONFIG;
import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.NULL_MODE;
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.QUERY_COLUMNS;
import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS;
import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_DELIMITER;
import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.TABLE;
@@ -50,6 +51,8 @@ public class HbaseParameters implements Serializable {
private List<String> rowkeyColumns;
+ private List<String> columns;
+
private Map<String, String> familyNames;
private String versionColumn;
@@ -103,4 +106,19 @@ public class HbaseParameters implements Serializable {
}
return builder.build();
}
+
+ public static HbaseParameters buildWithSinkConfig(Config pluginConfig) {
+ HbaseParametersBuilder builder = HbaseParameters.builder();
+
+ // required parameters
+
builder.zookeeperQuorum(pluginConfig.getString(ZOOKEEPER_QUORUM.key()));
+ builder.table(pluginConfig.getString(TABLE.key()));
+ builder.columns(pluginConfig.getStringList(QUERY_COLUMNS.key()));
+
+ if (pluginConfig.hasPath(HBASE_EXTRA_CONFIG.key())) {
+ Config extraConfig =
pluginConfig.getConfig(HBASE_EXTRA_CONFIG.key());
+
builder.hbaseExtraConfig(TypesafeConfigUtils.configToMap(extraConfig));
+ }
+ return builder.build();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/exception/HbaseConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/exception/HbaseConnectorErrorCode.java
new file mode 100644
index 0000000000..5717c933b0
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/exception/HbaseConnectorErrorCode.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum HbaseConnectorErrorCode implements SeaTunnelErrorCode {
+ CONNECTION_FAILED("Hbase-01", "Build Hbase connection failed");
+
+ private final String code;
+ private final String description;
+
+ HbaseConnectorErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return code;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/format/HBaseDeserializationFormat.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/format/HBaseDeserializationFormat.java
new file mode 100644
index 0000000000..8d7a1bcbe1
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/format/HBaseDeserializationFormat.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.format;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.common.utils.DateTimeUtils;
+import org.apache.seatunnel.common.utils.DateUtils;
+import org.apache.seatunnel.common.utils.TimeUtils;
+import
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+
+import static
org.apache.seatunnel.common.utils.DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
+
+public class HBaseDeserializationFormat {
+
+ private final DateUtils.Formatter dateFormat =
DateUtils.Formatter.YYYY_MM_DD;
+ private final DateTimeUtils.Formatter datetimeFormat = YYYY_MM_DD_HH_MM_SS;
+ private final TimeUtils.Formatter timeFormat =
TimeUtils.Formatter.HH_MM_SS;
+
+ public SeaTunnelRow deserialize(byte[][] rowCell, SeaTunnelRowType
seaTunnelRowType) {
+ SeaTunnelRow row = new SeaTunnelRow(seaTunnelRowType.getTotalFields());
+ for (int i = 0; i < row.getArity(); i++) {
+ SeaTunnelDataType<?> fieldType = seaTunnelRowType.getFieldType(i);
+ row.setField(i, deserializeValue(fieldType, rowCell[i]));
+ }
+ return row;
+ }
+
+ private Object deserializeValue(SeaTunnelDataType<?> typeInfo, byte[]
cell) {
+ if (cell == null) {
+ return null;
+ }
+
+ switch (typeInfo.getSqlType()) {
+ case TINYINT:
+ case SMALLINT:
+ case INT:
+ return Bytes.toInt(cell);
+ case BOOLEAN:
+ return Bytes.toBoolean(cell);
+ case BIGINT:
+ return Bytes.toLong(cell);
+ case FLOAT:
+ case DECIMAL:
+ return Bytes.toFloat(cell);
+ case DOUBLE:
+ return Bytes.toDouble(cell);
+ case BYTES:
+ return cell;
+ case DATE:
+ return LocalDate.parse(
+ Bytes.toString(cell),
DateTimeFormatter.ofPattern(dateFormat.getValue()));
+ case TIME:
+ return LocalTime.parse(
+ Bytes.toString(cell),
DateTimeFormatter.ofPattern(timeFormat.getValue()));
+ case TIMESTAMP:
+ return LocalDateTime.parse(
+ Bytes.toString(cell),
+
DateTimeFormatter.ofPattern(datetimeFormat.getValue()));
+ case STRING:
+ return Bytes.toString(cell);
+ default:
+ throw new HbaseConnectorException(
+ CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
+ "Unsupported data type " + typeInfo.getSqlType());
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java
new file mode 100644
index 0000000000..869e33f623
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.source;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportColumnProjection;
+import org.apache.seatunnel.api.source.SupportParallelism;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.QUERY_COLUMNS;
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.TABLE;
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ZOOKEEPER_QUORUM;
+
+public class HbaseSource
+ implements SeaTunnelSource<SeaTunnelRow, HbaseSourceSplit,
HbaseSourceState>,
+ SupportParallelism,
+ SupportColumnProjection {
+ private static final Logger LOG =
LoggerFactory.getLogger(HbaseSource.class);
+ public static final String PLUGIN_NAME = "Hbase";
+ private Config pluginConfig;
+ private SeaTunnelRowType seaTunnelRowType;
+ private HbaseParameters hbaseParameters;
+
+ private CatalogTable catalogTable;
+
+ @Override
+ public String getPluginName() {
+ return PLUGIN_NAME;
+ }
+
+ HbaseSource(Config pluginConfig) {
+ this.pluginConfig = pluginConfig;
+ CheckResult result =
+ CheckConfigUtil.checkAllExists(
+ pluginConfig, ZOOKEEPER_QUORUM.key(), TABLE.key(),
QUERY_COLUMNS.key());
+ if (!result.isSuccess()) {
+ throw new HbaseConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format(
+ "PluginName: %s, PluginType: %s, Message: %s",
+ getPluginName(), PluginType.SOURCE,
result.getMsg()));
+ }
+ this.hbaseParameters =
HbaseParameters.buildWithSinkConfig(pluginConfig);
+ this.catalogTable = CatalogTableUtil.buildWithConfig(pluginConfig);
+ this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Override
+ public List<CatalogTable> getProducedCatalogTables() {
+ return Lists.newArrayList(catalogTable);
+ }
+
+ @Override
+ public SourceReader<SeaTunnelRow, HbaseSourceSplit> createReader(
+ SourceReader.Context readerContext) throws Exception {
+ return new HbaseSourceReader(hbaseParameters, readerContext,
seaTunnelRowType);
+ }
+
+ @Override
+ public SourceSplitEnumerator<HbaseSourceSplit, HbaseSourceState>
createEnumerator(
+ SourceSplitEnumerator.Context<HbaseSourceSplit> enumeratorContext)
throws Exception {
+ return new HbaseSourceSplitEnumerator(enumeratorContext,
hbaseParameters);
+ }
+
+ @Override
+ public SourceSplitEnumerator<HbaseSourceSplit, HbaseSourceState>
restoreEnumerator(
+ SourceSplitEnumerator.Context<HbaseSourceSplit> enumeratorContext,
+ HbaseSourceState checkpointState)
+ throws Exception {
+ return new HbaseSourceSplitEnumerator(enumeratorContext,
hbaseParameters, checkpointState);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java
new file mode 100644
index 0000000000..4eec3e0048
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig;
+
+import com.google.auto.service.AutoService;
+
+import java.io.Serializable;
+
+import static
org.apache.seatunnel.connectors.seatunnel.hbase.sink.HbaseSinkFactory.IDENTIFIER;
+
+@AutoService(Factory.class)
+public class HbaseSourceFactory implements TableSourceFactory {
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(HbaseConfig.ZOOKEEPER_QUORUM)
+ .required(HbaseConfig.TABLE)
+ .required(HbaseConfig.QUERY_COLUMNS)
+ .build();
+ }
+
+ @Override
+ public Class<? extends SeaTunnelSource> getSourceClass() {
+ return HbaseSource.class;
+ }
+
+ @Override
+ public <T, SplitT extends SourceSplit, StateT extends Serializable>
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
+ return () ->
+ (SeaTunnelSource<T, SplitT, StateT>)
+ new HbaseSource(context.getOptions().toConfig());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java
new file mode 100644
index 0000000000..556374844e
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.source;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import
org.apache.seatunnel.connectors.seatunnel.hbase.format.HBaseDeserializationFormat;
+import
org.apache.seatunnel.connectors.seatunnel.hbase.utils.HbaseConnectionUtil;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+@Slf4j
+public class HbaseSourceReader implements SourceReader<SeaTunnelRow,
HbaseSourceSplit> {
+ private static final String ROW_KEY = "rowkey";
+ private final Deque<HbaseSourceSplit> sourceSplits = new
ConcurrentLinkedDeque<>();
+
+ private final transient Map<String, byte[][]> namesMap;
+
+ private final Set<String> columnFamilies = new LinkedHashSet<>();
+ private final SourceReader.Context context;
+ private final SeaTunnelRowType seaTunnelRowType;
+ private volatile boolean noMoreSplit = false;
+
+ private HbaseParameters hbaseParameters;
+ private final List<String> columnNames;
+ private final transient Connection connection;
+
+ private HBaseDeserializationFormat hbaseDeserializationFormat =
+ new HBaseDeserializationFormat();
+ private ResultScanner currentScanner;
+
+ public HbaseSourceReader(
+ HbaseParameters hbaseParameters, Context context, SeaTunnelRowType
seaTunnelRowType) {
+ this.hbaseParameters = hbaseParameters;
+ this.context = context;
+ this.seaTunnelRowType = seaTunnelRowType;
+ this.namesMap = Maps.newConcurrentMap();
+
+ this.columnNames = hbaseParameters.getColumns();
+ // Check if input column names are in format: [ columnFamily:column ].
+ this.columnNames.stream()
+ .peek(
+ column ->
+ Preconditions.checkArgument(
+ (column.contains(":") &&
column.split(":").length == 2)
+ ||
this.ROW_KEY.equalsIgnoreCase(column),
+ "Invalid column names, it should be
[ColumnFamily:Column] format"))
+ .forEach(column ->
this.columnFamilies.add(column.split(":")[0]));
+
+ connection = HbaseConnectionUtil.getHbaseConnection(hbaseParameters);
+ }
+
+ @Override
+ public void open() throws Exception {
+ // do nothing
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.currentScanner != null) {
+ try {
+ this.currentScanner.close();
+ } catch (Exception e) {
+ throw new IOException("Failed to close HBase Scanner.", e);
+ }
+ }
+ if (this.connection != null) {
+ try {
+ this.connection.close();
+ } catch (Exception e) {
+ throw new IOException("Failed to close HBase connection.", e);
+ }
+ log.info("Current HBase connection is closed.");
+ }
+ }
+
+ @Override
+ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+ synchronized (output.getCheckpointLock()) {
+ final HbaseSourceSplit split = sourceSplits.poll();
+ if (Objects.nonNull(split)) {
+ // read logic
+ if (this.currentScanner == null) {
+ Scan scan = new Scan();
+ scan.withStartRow(split.getStartRow(), true);
+ scan.withStopRow(split.getEndRow(), true);
+ this.currentScanner =
+ this.connection
+
.getTable(TableName.valueOf(hbaseParameters.getTable()))
+ .getScanner(scan);
+ }
+ for (Result result : currentScanner) {
+ SeaTunnelRow seaTunnelRow =
+ hbaseDeserializationFormat.deserialize(
+ convertRawRow(result), seaTunnelRowType);
+ output.collect(seaTunnelRow);
+ }
+ } else if (noMoreSplit && sourceSplits.isEmpty()) {
+ // signal to the source that we have reached the end of the
data.
+ log.info("Closed the bounded Hbase source");
+ context.signalNoMoreElement();
+ } else {
+ log.warn("Waiting for Hbase split, sleeping 1s");
+ Thread.sleep(1000L);
+ }
+ }
+ }
+
+ private byte[][] convertRawRow(Result result) {
+ String[] fieldNames = seaTunnelRowType.getFieldNames();
+ byte[][] rawRow = new byte[fieldNames.length][];
+ for (int i = 0; i < fieldNames.length; ++i) {
+ String columnName = fieldNames[i];
+ byte[] bytes;
+ try {
+ // handle rowkey column
+ if (this.ROW_KEY.equals(columnName)) {
+ bytes = result.getRow();
+ } else {
+ byte[][] arr = this.namesMap.get(columnName);
+ // Deduplicate
+ if (Objects.isNull(arr)) {
+ arr = new byte[2][];
+ String[] arr1 = columnName.split(":");
+ arr[0] =
arr1[0].trim().getBytes(StandardCharsets.UTF_8);
+ arr[1] =
arr1[1].trim().getBytes(StandardCharsets.UTF_8);
+ this.namesMap.put(columnName, arr);
+ }
+ bytes = result.getValue(arr[0], arr[1]);
+ }
+ rawRow[i] = bytes;
+ } catch (Exception e) {
+ log.error(
+ "Cannot read data from {}, reason: \n",
this.hbaseParameters.getTable(), e);
+ }
+ }
+ return rawRow;
+ }
+
+ @Override
+ public List<HbaseSourceSplit> snapshotState(long checkpointId) {
+ return new ArrayList<>(sourceSplits);
+ }
+
+ @Override
+ public void addSplits(List<HbaseSourceSplit> splits) {
+ sourceSplits.addAll(splits);
+ }
+
+ @Override
+ public void handleNoMoreSplits() {
+ noMoreSplit = true;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {}
+}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplit.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplit.java
new file mode 100644
index 0000000000..1d38ddd116
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplit.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+public class HbaseSourceSplit implements SourceSplit {
+ public static final String HBASE_SOURCE_SPLIT_PREFIX =
"hbase_source_split_";
+ private String splitId;
+ private byte[] startRow;
+ private byte[] endRow;
+
+ public HbaseSourceSplit(int splitId) {
+ this.splitId = HBASE_SOURCE_SPLIT_PREFIX + splitId;
+ }
+
+ public HbaseSourceSplit(int splitId, byte[] startRow, byte[] endRow) {
+ this.splitId = HBASE_SOURCE_SPLIT_PREFIX + splitId;
+ this.startRow = startRow;
+ this.endRow = endRow;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("{\"split_id\":\"%s\"}", splitId);
+ }
+
+ @Override
+ public String splitId() {
+ return splitId;
+ }
+
+ public byte[] getStartRow() {
+ return startRow;
+ }
+
+ public byte[] getEndRow() {
+ return endRow;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumerator.java
new file mode 100644
index 0000000000..094128b174
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumerator.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.source;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import
org.apache.seatunnel.connectors.seatunnel.hbase.utils.HbaseConnectionUtil;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionLocator;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class HbaseSourceSplitEnumerator
+ implements SourceSplitEnumerator<HbaseSourceSplit, HbaseSourceState> {
+ /** Source split enumerator context */
+ private final Context<HbaseSourceSplit> context;
+
+ private Config pluginConfig;
+ /** The splits that has assigned */
+ private final Set<HbaseSourceSplit> assignedSplit;
+
+ /** The splits that have not assigned */
+ private Set<HbaseSourceSplit> pendingSplit;
+
+ private HbaseParameters hbaseParameters;
+ private Connection connection;
+
+ public HbaseSourceSplitEnumerator(
+ Context<HbaseSourceSplit> context, HbaseParameters
hbaseParameters) {
+ this.context = context;
+ this.hbaseParameters = hbaseParameters;
+ this.assignedSplit = new HashSet<>();
+ connection = HbaseConnectionUtil.getHbaseConnection(hbaseParameters);
+ }
+
+ public HbaseSourceSplitEnumerator(
+ Context<HbaseSourceSplit> context,
+ HbaseParameters hbaseParameters,
+ HbaseSourceState sourceState) {
+ this.context = context;
+ this.hbaseParameters = hbaseParameters;
+ this.assignedSplit = sourceState.getAssignedSplits();
+ connection = HbaseConnectionUtil.getHbaseConnection(hbaseParameters);
+ }
+
+ @Override
+ public void open() {
+ this.pendingSplit = new HashSet<>();
+ }
+
+ @Override
+ public void run() throws Exception {
+ // do nothing
+ }
+
+ @Override
+ public void close() throws IOException {
+ // do nothing
+ }
+
+ @Override
+ public void addSplitsBack(List<HbaseSourceSplit> splits, int subtaskId) {
+ if (!splits.isEmpty()) {
+ pendingSplit.addAll(splits);
+ assignSplit(subtaskId);
+ }
+ }
+
+ @Override
+ public int currentUnassignedSplitSize() {
+ return pendingSplit.size();
+ }
+
+ @Override
+ public void registerReader(int subtaskId) {
+ pendingSplit = getTableSplits();
+ assignSplit(subtaskId);
+ }
+
+ @Override
+ public HbaseSourceState snapshotState(long checkpointId) throws Exception {
+ return new HbaseSourceState(assignedSplit);
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ // do nothing
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId) {
+ // do nothing
+ }
+
+ /** Assign split by reader task id */
+ private void assignSplit(int taskId) {
+ ArrayList<HbaseSourceSplit> currentTaskSplits = new ArrayList<>();
+ if (context.currentParallelism() == 1) {
+ // if parallelism == 1, we should assign all the splits to reader
+ currentTaskSplits.addAll(pendingSplit);
+ } else {
+ // if parallelism > 1, according to hashCode of split's id to
determine whether to
+ // allocate the current task
+ for (HbaseSourceSplit sourceSplit : pendingSplit) {
+ final int splitOwner =
+ getSplitOwner(sourceSplit.splitId(),
context.currentParallelism());
+ if (splitOwner == taskId) {
+ currentTaskSplits.add(sourceSplit);
+ }
+ }
+ }
+ // assign splits
+ context.assignSplit(taskId, currentTaskSplits);
+ // save the state of assigned splits
+ assignedSplit.addAll(currentTaskSplits);
+ // remove the assigned splits from pending splits
+ currentTaskSplits.forEach(split -> pendingSplit.remove(split));
+ log.info(
+ "SubTask {} is assigned to [{}]",
+ taskId,
+ currentTaskSplits.stream()
+ .map(HbaseSourceSplit::splitId)
+ .collect(Collectors.joining(",")));
+ context.signalNoMoreSplits(taskId);
+ }
+
+ /** Get all splits of table */
+ private Set<HbaseSourceSplit> getTableSplits() {
+ List<HbaseSourceSplit> splits = new ArrayList<>();
+
+ try {
+ RegionLocator regionLocator =
+
connection.getRegionLocator(TableName.valueOf(hbaseParameters.getTable()));
+ byte[][] startKeys = regionLocator.getStartKeys();
+ byte[][] endKeys = regionLocator.getEndKeys();
+ if (startKeys.length != endKeys.length) {
+ throw new IOException(
+ "Failed to create Splits for HBase table {}. HBase
start keys and end keys not equal."
+ + hbaseParameters.getTable());
+ }
+
+ int i = 0;
+ while (i < startKeys.length) {
+ splits.add(new HbaseSourceSplit(i, startKeys[i], endKeys[i]));
+ i++;
+ }
+ return new HashSet<>(splits);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** Hash algorithm for assigning splits to readers */
+ private static int getSplitOwner(String tp, int numReaders) {
+ return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceState.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceState.java
new file mode 100644
index 0000000000..f2a79dd568
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceState.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.source;
+
+import java.io.Serializable;
+import java.util.Set;
+
+public class HbaseSourceState implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final Set<HbaseSourceSplit> assignedSplits;
+
+ public HbaseSourceState(Set<HbaseSourceSplit> assignedSplits) {
+ this.assignedSplits = assignedSplits;
+ }
+
+ public Set<HbaseSourceSplit> getAssignedSplits() {
+ return assignedSplits;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/utils/HbaseConnectionUtil.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/utils/HbaseConnectionUtil.java
new file mode 100644
index 0000000000..f006986e66
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/utils/HbaseConnectionUtil.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.utils;
+
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+
+import java.io.IOException;
+
+public class HbaseConnectionUtil {
+ public static Connection getHbaseConnection(HbaseParameters
hbaseParameters) {
+ Configuration hbaseConfiguration = HBaseConfiguration.create();
+ hbaseConfiguration.set("hbase.zookeeper.quorum",
hbaseParameters.getZookeeperQuorum());
+ if (hbaseParameters.getHbaseExtraConfig() != null) {
+
hbaseParameters.getHbaseExtraConfig().forEach(hbaseConfiguration::set);
+ }
+ // initialize hbase connection
+ try {
+ Connection connection =
ConnectionFactory.createConnection(hbaseConfiguration);
+ return connection;
+ } catch (IOException e) {
+ String errorMsg = "Build Hbase connection failed.";
+ throw new
HbaseConnectorException(HbaseConnectorErrorCode.CONNECTION_FAILED, errorMsg);
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
index 4c7a9587ea..e27e5c715e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
@@ -147,6 +147,12 @@ public class HbaseIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(results.size(), 5);
}
+ @TestTemplate
+ public void testHbaseSource(TestContainer container) throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/hbase-to-assert.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
@TestTemplate
public void testHbaseSinkWithArray(TestContainer container)
throws IOException, InterruptedException {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-to-assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-to-assert.conf
new file mode 100644
index 0000000000..f209875745
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-to-assert.conf
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Hbase {
+ zookeeper_quorum = "hbase-e2e:2181"
+ table = "seatunnel_test"
+ query_columns=["rowkey", "cf1:col1", "cf1:col2", "cf2:col1", "cf2:col2"]
+ schema = {
+ columns = [
+ {
+ name = rowkey
+ type = string
+ },
+ {
+ name = "cf1:col1"
+ type = boolean
+ },
+ {
+ name = "cf1:col2"
+ type = double
+ },
+ {
+ name = "cf2:col1"
+ type = bigint
+ },
+ {
+ name = "cf2:col2"
+ type = int
+ }
+ ]
+ }
+ result_table_name = hbase_source
+ }
+}
+
+sink {
+ Assert {
+ source_table_name = hbase_source
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 10000
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 10000
+ }
+ ],
+ field_rules = [
+ {
+ field_name = rowkey
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = "cf1:col1"
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = "cf1:col2"
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = "cf2:col1"
+ field_type = bigint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = "cf2:col2"
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file