This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f108a5e658 [Connector]Add hbase source connector (#6348)
f108a5e658 is described below

commit f108a5e658b261aa049e688264dcc50787611761
Author: TaoZex <[email protected]>
AuthorDate: Wed May 15 15:48:08 2024 +0800

    [Connector]Add hbase source connector (#6348)
---
 .../connector-v2/Error-Quick-Reference-Manual.md   |   6 +
 docs/en/connector-v2/source/Hbase.md               |  91 ++++++++++
 plugin-mapping.properties                          |   1 +
 .../seatunnel/hbase/config/HbaseConfig.java        |   6 +
 .../seatunnel/hbase/config/HbaseParameters.java    |  18 ++
 .../hbase/exception/HbaseConnectorErrorCode.java   |  43 +++++
 .../hbase/format/HBaseDeserializationFormat.java   |  93 ++++++++++
 .../seatunnel/hbase/source/HbaseSource.java        | 114 ++++++++++++
 .../seatunnel/hbase/source/HbaseSourceFactory.java |  64 +++++++
 .../seatunnel/hbase/source/HbaseSourceReader.java  | 195 +++++++++++++++++++++
 .../seatunnel/hbase/source/HbaseSourceSplit.java   |  56 ++++++
 .../hbase/source/HbaseSourceSplitEnumerator.java   | 185 +++++++++++++++++++
 .../seatunnel/hbase/source/HbaseSourceState.java   |  36 ++++
 .../seatunnel/hbase/utils/HbaseConnectionUtil.java |  48 +++++
 .../seatunnel/e2e/connector/hbase/HbaseIT.java     |   6 +
 .../src/test/resources/hbase-to-assert.conf        | 119 +++++++++++++
 16 files changed, 1081 insertions(+)

diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md 
b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index 2519bfbe65..960bddc0eb 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -256,6 +256,12 @@ problems encountered by users.
 
|--------------|-------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------|
 | FIRESTORE-01 | Close Firestore client failed | When users encounter this 
error code, it is usually there are some problems with closing the Firestore 
client, please check the Firestore is work |
 
+## Hbase Connector Error Codes
+
+|   code   |          description          |                                   
                         solution                                               
              |
+|----------|-------------------------------|---------------------------------------------------------------------------------------------------------------------------------|
+| Hbase-01 | Build hbase connection failed | When users create Hbase database 
connection, the connection failed. Check the Hbase configuration parameters 
used and try again |
+
 ## FilterFieldTransform Error Codes
 
 |           code            |      description       |        solution         
|
diff --git a/docs/en/connector-v2/source/Hbase.md 
b/docs/en/connector-v2/source/Hbase.md
new file mode 100644
index 0000000000..677b827fb2
--- /dev/null
+++ b/docs/en/connector-v2/source/Hbase.md
@@ -0,0 +1,91 @@
+# Hbase
+
+> Hbase source connector
+
+## Description
+
+Read data from Apache Hbase.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [schema projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+|        name        |  type  | required | default value |
+|--------------------|--------|----------|---------------|
+| zookeeper_quorum   | string | yes      | -             |
+| table              | string | yes      | -             |
+| query_columns      | list   | yes      | -             |
+| schema             | config | yes      | -             |
+| hbase_extra_config | string | no       | -             |
+| common-options     |        | no       | -             |
+
+### zookeeper_quorum [string]
+
+The zookeeper cluster host of hbase, example: 
"hadoop001:2181,hadoop002:2181,hadoop003:2181"
+
+### table [string]
+
+The table name you want to write, example: "seatunnel"
+
+### query_columns [list]
+
+The column name which you want to query in the table. If you want to query the 
rowkey column, please set "rowkey" in query_columns.
+Other column format should be: columnFamily:columnName, example: ["rowkey", 
"columnFamily1:column1", "columnFamily1:column1", "columnFamily2:column1"]
+
+### schema [config]
+
+Hbase uses byte arrays for storage. Therefore, you need to configure data 
types for each column in a table. For more information, see: 
[guide](../../concept/schema-feature.md#how-to-declare-type-supported).
+
+### hbase_extra_config [config]
+
+The extra configuration of hbase
+
+### common options
+
+Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details
+
+## Examples
+
+```bash
+source {
+  Hbase {
+      zookeeper_quorum = "hadoop001:2181,hadoop002:2181,hadoop003:2181"
+      table = "seatunnel_test"
+      query_columns=["rowkey", "columnFamily1:column1", 
"columnFamily1:column1", "columnFamily2:column1"]
+      schema = {
+            columns = [
+                  {
+                     name = rowkey
+                     type = string
+                  },
+                  {
+                     name = "columnFamily1:column1"
+                     type = boolean
+                  },
+                  {
+                     name = "columnFamily1:column1"
+                     type = double
+                  },
+                  {
+                     name = "columnFamily2:column1"
+                     type = bigint
+                  }
+            ]
+      }
+  }
+}
+```
+
+## Changelog
+
+### next version
+
+- Add Hbase Source Connector
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index c880a8fdf2..314d453ffc 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -110,6 +110,7 @@ seatunnel.source.TDengine = connector-tdengine
 seatunnel.sink.TDengine = connector-tdengine
 seatunnel.source.Persistiq = connector-http-persistiq
 seatunnel.sink.SelectDBCloud = connector-selectdb-cloud
+seatunnel.source.Hbase = connector-hbase
 seatunnel.sink.Hbase = connector-hbase
 seatunnel.source.StarRocks = connector-starrocks
 seatunnel.source.Rocketmq = connector-rocketmq
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
index 605a5291fc..565f1b4b48 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
@@ -42,6 +42,12 @@ public class HbaseConfig {
                     .noDefaultValue()
                     .withDescription("Hbase rowkey column");
 
+    public static final Option<List<String>> QUERY_COLUMNS =
+            Options.key("query_columns")
+                    .listType()
+                    .noDefaultValue()
+                    .withDescription("query Hbase columns");
+
     public static final Option<String> ROWKEY_DELIMITER =
             Options.key("rowkey_delimiter")
                     .stringType()
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
index fe32301f6b..858030fe2a 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
@@ -32,6 +32,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig
 import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME;
 import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_EXTRA_CONFIG;
 import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.NULL_MODE;
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.QUERY_COLUMNS;
 import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS;
 import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_DELIMITER;
 import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.TABLE;
@@ -50,6 +51,8 @@ public class HbaseParameters implements Serializable {
 
     private List<String> rowkeyColumns;
 
+    private List<String> columns;
+
     private Map<String, String> familyNames;
 
     private String versionColumn;
@@ -103,4 +106,19 @@ public class HbaseParameters implements Serializable {
         }
         return builder.build();
     }
+
+    public static HbaseParameters buildWithSinkConfig(Config pluginConfig) {
+        HbaseParametersBuilder builder = HbaseParameters.builder();
+
+        // required parameters
+        
builder.zookeeperQuorum(pluginConfig.getString(ZOOKEEPER_QUORUM.key()));
+        builder.table(pluginConfig.getString(TABLE.key()));
+        builder.columns(pluginConfig.getStringList(QUERY_COLUMNS.key()));
+
+        if (pluginConfig.hasPath(HBASE_EXTRA_CONFIG.key())) {
+            Config extraConfig = 
pluginConfig.getConfig(HBASE_EXTRA_CONFIG.key());
+            
builder.hbaseExtraConfig(TypesafeConfigUtils.configToMap(extraConfig));
+        }
+        return builder.build();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/exception/HbaseConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/exception/HbaseConnectorErrorCode.java
new file mode 100644
index 0000000000..5717c933b0
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/exception/HbaseConnectorErrorCode.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum HbaseConnectorErrorCode implements SeaTunnelErrorCode {
+    CONNECTION_FAILED("Hbase-01", "Build Hbase connection failed");
+
+    private final String code;
+    private final String description;
+
+    HbaseConnectorErrorCode(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+    @Override
+    public String getCode() {
+        return code;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/format/HBaseDeserializationFormat.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/format/HBaseDeserializationFormat.java
new file mode 100644
index 0000000000..8d7a1bcbe1
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/format/HBaseDeserializationFormat.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.format;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.common.utils.DateTimeUtils;
+import org.apache.seatunnel.common.utils.DateUtils;
+import org.apache.seatunnel.common.utils.TimeUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+
+import static 
org.apache.seatunnel.common.utils.DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
+
+public class HBaseDeserializationFormat {
+
+    private final DateUtils.Formatter dateFormat = 
DateUtils.Formatter.YYYY_MM_DD;
+    private final DateTimeUtils.Formatter datetimeFormat = YYYY_MM_DD_HH_MM_SS;
+    private final TimeUtils.Formatter timeFormat = 
TimeUtils.Formatter.HH_MM_SS;
+
+    public SeaTunnelRow deserialize(byte[][] rowCell, SeaTunnelRowType 
seaTunnelRowType) {
+        SeaTunnelRow row = new SeaTunnelRow(seaTunnelRowType.getTotalFields());
+        for (int i = 0; i < row.getArity(); i++) {
+            SeaTunnelDataType<?> fieldType = seaTunnelRowType.getFieldType(i);
+            row.setField(i, deserializeValue(fieldType, rowCell[i]));
+        }
+        return row;
+    }
+
+    private Object deserializeValue(SeaTunnelDataType<?> typeInfo, byte[] 
cell) {
+        if (cell == null) {
+            return null;
+        }
+
+        switch (typeInfo.getSqlType()) {
+            case TINYINT:
+            case SMALLINT:
+            case INT:
+                return Bytes.toInt(cell);
+            case BOOLEAN:
+                return Bytes.toBoolean(cell);
+            case BIGINT:
+                return Bytes.toLong(cell);
+            case FLOAT:
+            case DECIMAL:
+                return Bytes.toFloat(cell);
+            case DOUBLE:
+                return Bytes.toDouble(cell);
+            case BYTES:
+                return cell;
+            case DATE:
+                return LocalDate.parse(
+                        Bytes.toString(cell), 
DateTimeFormatter.ofPattern(dateFormat.getValue()));
+            case TIME:
+                return LocalTime.parse(
+                        Bytes.toString(cell), 
DateTimeFormatter.ofPattern(timeFormat.getValue()));
+            case TIMESTAMP:
+                return LocalDateTime.parse(
+                        Bytes.toString(cell),
+                        
DateTimeFormatter.ofPattern(datetimeFormat.getValue()));
+            case STRING:
+                return Bytes.toString(cell);
+            default:
+                throw new HbaseConnectorException(
+                        CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
+                        "Unsupported data type " + typeInfo.getSqlType());
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java
new file mode 100644
index 0000000000..869e33f623
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.source;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportColumnProjection;
+import org.apache.seatunnel.api.source.SupportParallelism;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import 
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.QUERY_COLUMNS;
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.TABLE;
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ZOOKEEPER_QUORUM;
+
+public class HbaseSource
+        implements SeaTunnelSource<SeaTunnelRow, HbaseSourceSplit, 
HbaseSourceState>,
+                SupportParallelism,
+                SupportColumnProjection {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HbaseSource.class);
+    public static final String PLUGIN_NAME = "Hbase";
+    private Config pluginConfig;
+    private SeaTunnelRowType seaTunnelRowType;
+    private HbaseParameters hbaseParameters;
+
+    private CatalogTable catalogTable;
+
+    @Override
+    public String getPluginName() {
+        return PLUGIN_NAME;
+    }
+
+    HbaseSource(Config pluginConfig) {
+        this.pluginConfig = pluginConfig;
+        CheckResult result =
+                CheckConfigUtil.checkAllExists(
+                        pluginConfig, ZOOKEEPER_QUORUM.key(), TABLE.key(), 
QUERY_COLUMNS.key());
+        if (!result.isSuccess()) {
+            throw new HbaseConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format(
+                            "PluginName: %s, PluginType: %s, Message: %s",
+                            getPluginName(), PluginType.SOURCE, 
result.getMsg()));
+        }
+        this.hbaseParameters = 
HbaseParameters.buildWithSinkConfig(pluginConfig);
+        this.catalogTable = CatalogTableUtil.buildWithConfig(pluginConfig);
+        this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public List<CatalogTable> getProducedCatalogTables() {
+        return Lists.newArrayList(catalogTable);
+    }
+
+    @Override
+    public SourceReader<SeaTunnelRow, HbaseSourceSplit> createReader(
+            SourceReader.Context readerContext) throws Exception {
+        return new HbaseSourceReader(hbaseParameters, readerContext, 
seaTunnelRowType);
+    }
+
+    @Override
+    public SourceSplitEnumerator<HbaseSourceSplit, HbaseSourceState> 
createEnumerator(
+            SourceSplitEnumerator.Context<HbaseSourceSplit> enumeratorContext) 
throws Exception {
+        return new HbaseSourceSplitEnumerator(enumeratorContext, 
hbaseParameters);
+    }
+
+    @Override
+    public SourceSplitEnumerator<HbaseSourceSplit, HbaseSourceState> 
restoreEnumerator(
+            SourceSplitEnumerator.Context<HbaseSourceSplit> enumeratorContext,
+            HbaseSourceState checkpointState)
+            throws Exception {
+        return new HbaseSourceSplitEnumerator(enumeratorContext, 
hbaseParameters, checkpointState);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java
new file mode 100644
index 0000000000..4eec3e0048
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig;
+
+import com.google.auto.service.AutoService;
+
+import java.io.Serializable;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.sink.HbaseSinkFactory.IDENTIFIER;
+
+@AutoService(Factory.class)
+public class HbaseSourceFactory implements TableSourceFactory {
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(HbaseConfig.ZOOKEEPER_QUORUM)
+                .required(HbaseConfig.TABLE)
+                .required(HbaseConfig.QUERY_COLUMNS)
+                .build();
+    }
+
+    @Override
+    public Class<? extends SeaTunnelSource> getSourceClass() {
+        return HbaseSource.class;
+    }
+
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        return () ->
+                (SeaTunnelSource<T, SplitT, StateT>)
+                        new HbaseSource(context.getOptions().toConfig());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java
new file mode 100644
index 0000000000..556374844e
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.source;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import 
org.apache.seatunnel.connectors.seatunnel.hbase.format.HBaseDeserializationFormat;
+import 
org.apache.seatunnel.connectors.seatunnel.hbase.utils.HbaseConnectionUtil;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+@Slf4j
+public class HbaseSourceReader implements SourceReader<SeaTunnelRow, 
HbaseSourceSplit> {
+    private static final String ROW_KEY = "rowkey";
+    private final Deque<HbaseSourceSplit> sourceSplits = new 
ConcurrentLinkedDeque<>();
+
+    private final transient Map<String, byte[][]> namesMap;
+
+    private final Set<String> columnFamilies = new LinkedHashSet<>();
+    private final SourceReader.Context context;
+    private final SeaTunnelRowType seaTunnelRowType;
+    private volatile boolean noMoreSplit = false;
+
+    private HbaseParameters hbaseParameters;
+    private final List<String> columnNames;
+    private final transient Connection connection;
+
+    private HBaseDeserializationFormat hbaseDeserializationFormat =
+            new HBaseDeserializationFormat();
+    private ResultScanner currentScanner;
+
+    public HbaseSourceReader(
+            HbaseParameters hbaseParameters, Context context, SeaTunnelRowType 
seaTunnelRowType) {
+        this.hbaseParameters = hbaseParameters;
+        this.context = context;
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.namesMap = Maps.newConcurrentMap();
+
+        this.columnNames = hbaseParameters.getColumns();
+        // Check if input column names are in format: [ columnFamily:column ].
+        this.columnNames.stream()
+                .peek(
+                        column ->
+                                Preconditions.checkArgument(
+                                        (column.contains(":") && 
column.split(":").length == 2)
+                                                || 
this.ROW_KEY.equalsIgnoreCase(column),
+                                        "Invalid column names, it should be 
[ColumnFamily:Column] format"))
+                .forEach(column -> 
this.columnFamilies.add(column.split(":")[0]));
+
+        connection = HbaseConnectionUtil.getHbaseConnection(hbaseParameters);
+    }
+
+    @Override
+    public void open() throws Exception {
+        // do nothing
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (this.currentScanner != null) {
+            try {
+                this.currentScanner.close();
+            } catch (Exception e) {
+                throw new IOException("Failed to close HBase Scanner.", e);
+            }
+        }
+        if (this.connection != null) {
+            try {
+                this.connection.close();
+            } catch (Exception e) {
+                throw new IOException("Failed to close HBase connection.", e);
+            }
+            log.info("Current HBase connection is closed.");
+        }
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+        synchronized (output.getCheckpointLock()) {
+            final HbaseSourceSplit split = sourceSplits.poll();
+            if (Objects.nonNull(split)) {
+                // read logic
+                if (this.currentScanner == null) {
+                    Scan scan = new Scan();
+                    scan.withStartRow(split.getStartRow(), true);
+                    scan.withStopRow(split.getEndRow(), true);
+                    this.currentScanner =
+                            this.connection
+                                    
.getTable(TableName.valueOf(hbaseParameters.getTable()))
+                                    .getScanner(scan);
+                }
+                for (Result result : currentScanner) {
+                    SeaTunnelRow seaTunnelRow =
+                            hbaseDeserializationFormat.deserialize(
+                                    convertRawRow(result), seaTunnelRowType);
+                    output.collect(seaTunnelRow);
+                }
+            } else if (noMoreSplit && sourceSplits.isEmpty()) {
+                // signal to the source that we have reached the end of the 
data.
+                log.info("Closed the bounded Hbase source");
+                context.signalNoMoreElement();
+            } else {
+                log.warn("Waiting for Hbase split, sleeping 1s");
+                Thread.sleep(1000L);
+            }
+        }
+    }
+
+    private byte[][] convertRawRow(Result result) {
+        String[] fieldNames = seaTunnelRowType.getFieldNames();
+        byte[][] rawRow = new byte[fieldNames.length][];
+        for (int i = 0; i < fieldNames.length; ++i) {
+            String columnName = fieldNames[i];
+            byte[] bytes;
+            try {
+                // handle rowkey column
+                if (this.ROW_KEY.equals(columnName)) {
+                    bytes = result.getRow();
+                } else {
+                    byte[][] arr = this.namesMap.get(columnName);
+                    // Deduplicate
+                    if (Objects.isNull(arr)) {
+                        arr = new byte[2][];
+                        String[] arr1 = columnName.split(":");
+                        arr[0] = 
arr1[0].trim().getBytes(StandardCharsets.UTF_8);
+                        arr[1] = 
arr1[1].trim().getBytes(StandardCharsets.UTF_8);
+                        this.namesMap.put(columnName, arr);
+                    }
+                    bytes = result.getValue(arr[0], arr[1]);
+                }
+                rawRow[i] = bytes;
+            } catch (Exception e) {
+                log.error(
+                        "Cannot read data from {}, reason: \n", 
this.hbaseParameters.getTable(), e);
+            }
+        }
+        return rawRow;
+    }
+
+    @Override
+    public List<HbaseSourceSplit> snapshotState(long checkpointId) {
+        return new ArrayList<>(sourceSplits);
+    }
+
+    @Override
+    public void addSplits(List<HbaseSourceSplit> splits) {
+        sourceSplits.addAll(splits);
+    }
+
+    @Override
+    public void handleNoMoreSplits() {
+        noMoreSplit = true;
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {}
+}
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplit.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplit.java
new file mode 100644
index 0000000000..1d38ddd116
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplit.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+public class HbaseSourceSplit implements SourceSplit {
+    public static final String HBASE_SOURCE_SPLIT_PREFIX = 
"hbase_source_split_";
+    private String splitId;
+    private byte[] startRow;
+    private byte[] endRow;
+
+    public HbaseSourceSplit(int splitId) {
+        this.splitId = HBASE_SOURCE_SPLIT_PREFIX + splitId;
+    }
+
+    public HbaseSourceSplit(int splitId, byte[] startRow, byte[] endRow) {
+        this.splitId = HBASE_SOURCE_SPLIT_PREFIX + splitId;
+        this.startRow = startRow;
+        this.endRow = endRow;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("{\"split_id\":\"%s\"}", splitId);
+    }
+
+    @Override
+    public String splitId() {
+        return splitId;
+    }
+
+    public byte[] getStartRow() {
+        return startRow;
+    }
+
+    public byte[] getEndRow() {
+        return endRow;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumerator.java
new file mode 100644
index 0000000000..094128b174
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumerator.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.source;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import 
org.apache.seatunnel.connectors.seatunnel.hbase.utils.HbaseConnectionUtil;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionLocator;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class HbaseSourceSplitEnumerator
+        implements SourceSplitEnumerator<HbaseSourceSplit, HbaseSourceState> {
+    /** Source split enumerator context */
+    private final Context<HbaseSourceSplit> context;
+
+    private Config pluginConfig;
+    /** The splits that has assigned */
+    private final Set<HbaseSourceSplit> assignedSplit;
+
+    /** The splits that have not assigned */
+    private Set<HbaseSourceSplit> pendingSplit;
+
+    private HbaseParameters hbaseParameters;
+    private Connection connection;
+
+    public HbaseSourceSplitEnumerator(
+            Context<HbaseSourceSplit> context, HbaseParameters 
hbaseParameters) {
+        this.context = context;
+        this.hbaseParameters = hbaseParameters;
+        this.assignedSplit = new HashSet<>();
+        connection = HbaseConnectionUtil.getHbaseConnection(hbaseParameters);
+    }
+
+    public HbaseSourceSplitEnumerator(
+            Context<HbaseSourceSplit> context,
+            HbaseParameters hbaseParameters,
+            HbaseSourceState sourceState) {
+        this.context = context;
+        this.hbaseParameters = hbaseParameters;
+        this.assignedSplit = sourceState.getAssignedSplits();
+        connection = HbaseConnectionUtil.getHbaseConnection(hbaseParameters);
+    }
+
+    @Override
+    public void open() {
+        this.pendingSplit = new HashSet<>();
+    }
+
+    @Override
+    public void run() throws Exception {
+        // do nothing
+    }
+
+    @Override
+    public void close() throws IOException {
+        // do nothing
+    }
+
+    @Override
+    public void addSplitsBack(List<HbaseSourceSplit> splits, int subtaskId) {
+        if (!splits.isEmpty()) {
+            pendingSplit.addAll(splits);
+            assignSplit(subtaskId);
+        }
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return pendingSplit.size();
+    }
+
+    @Override
+    public void registerReader(int subtaskId) {
+        pendingSplit = getTableSplits();
+        assignSplit(subtaskId);
+    }
+
+    @Override
+    public HbaseSourceState snapshotState(long checkpointId) throws Exception {
+        return new HbaseSourceState(assignedSplit);
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        // do nothing
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId) {
+        // do nothing
+    }
+
+    /** Assign split by reader task id */
+    private void assignSplit(int taskId) {
+        ArrayList<HbaseSourceSplit> currentTaskSplits = new ArrayList<>();
+        if (context.currentParallelism() == 1) {
+            // if parallelism == 1, we should assign all the splits to reader
+            currentTaskSplits.addAll(pendingSplit);
+        } else {
+            // if parallelism > 1, according to hashCode of split's id to 
determine whether to
+            // allocate the current task
+            for (HbaseSourceSplit sourceSplit : pendingSplit) {
+                final int splitOwner =
+                        getSplitOwner(sourceSplit.splitId(), 
context.currentParallelism());
+                if (splitOwner == taskId) {
+                    currentTaskSplits.add(sourceSplit);
+                }
+            }
+        }
+        // assign splits
+        context.assignSplit(taskId, currentTaskSplits);
+        // save the state of assigned splits
+        assignedSplit.addAll(currentTaskSplits);
+        // remove the assigned splits from pending splits
+        currentTaskSplits.forEach(split -> pendingSplit.remove(split));
+        log.info(
+                "SubTask {} is assigned to [{}]",
+                taskId,
+                currentTaskSplits.stream()
+                        .map(HbaseSourceSplit::splitId)
+                        .collect(Collectors.joining(",")));
+        context.signalNoMoreSplits(taskId);
+    }
+
+    /** Get all splits of table */
+    private Set<HbaseSourceSplit> getTableSplits() {
+        List<HbaseSourceSplit> splits = new ArrayList<>();
+
+        try {
+            RegionLocator regionLocator =
+                    
connection.getRegionLocator(TableName.valueOf(hbaseParameters.getTable()));
+            byte[][] startKeys = regionLocator.getStartKeys();
+            byte[][] endKeys = regionLocator.getEndKeys();
+            if (startKeys.length != endKeys.length) {
+                throw new IOException(
+                        "Failed to create Splits for HBase table {}. HBase 
start keys and end keys not equal."
+                                + hbaseParameters.getTable());
+            }
+
+            int i = 0;
+            while (i < startKeys.length) {
+                splits.add(new HbaseSourceSplit(i, startKeys[i], endKeys[i]));
+                i++;
+            }
+            return new HashSet<>(splits);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /** Hash algorithm for assigning splits to readers */
+    private static int getSplitOwner(String tp, int numReaders) {
+        return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceState.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceState.java
new file mode 100644
index 0000000000..f2a79dd568
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceState.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.source;
+
+import java.io.Serializable;
+import java.util.Set;
+
+public class HbaseSourceState implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final Set<HbaseSourceSplit> assignedSplits;
+
+    public HbaseSourceState(Set<HbaseSourceSplit> assignedSplits) {
+        this.assignedSplits = assignedSplits;
+    }
+
+    public Set<HbaseSourceSplit> getAssignedSplits() {
+        return assignedSplits;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/utils/HbaseConnectionUtil.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/utils/HbaseConnectionUtil.java
new file mode 100644
index 0000000000..f006986e66
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/utils/HbaseConnectionUtil.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.utils;
+
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import 
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+
+import java.io.IOException;
+
+public class HbaseConnectionUtil {
+    public static Connection getHbaseConnection(HbaseParameters 
hbaseParameters) {
+        Configuration hbaseConfiguration = HBaseConfiguration.create();
+        hbaseConfiguration.set("hbase.zookeeper.quorum", 
hbaseParameters.getZookeeperQuorum());
+        if (hbaseParameters.getHbaseExtraConfig() != null) {
+            
hbaseParameters.getHbaseExtraConfig().forEach(hbaseConfiguration::set);
+        }
+        // initialize hbase connection
+        try {
+            Connection connection = 
ConnectionFactory.createConnection(hbaseConfiguration);
+            return connection;
+        } catch (IOException e) {
+            String errorMsg = "Build Hbase connection failed.";
+            throw new 
HbaseConnectorException(HbaseConnectorErrorCode.CONNECTION_FAILED, errorMsg);
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
index 4c7a9587ea..e27e5c715e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
@@ -147,6 +147,12 @@ public class HbaseIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(results.size(), 5);
     }
 
+    @TestTemplate
+    public void testHbaseSource(TestContainer container) throws IOException, 
InterruptedException {
+        Container.ExecResult execResult = 
container.executeJob("/hbase-to-assert.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
     @TestTemplate
     public void testHbaseSinkWithArray(TestContainer container)
             throws IOException, InterruptedException {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-to-assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-to-assert.conf
new file mode 100644
index 0000000000..f209875745
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-to-assert.conf
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Hbase {
+      zookeeper_quorum = "hbase-e2e:2181"
+      table = "seatunnel_test"
+      query_columns=["rowkey", "cf1:col1", "cf1:col2", "cf2:col1", "cf2:col2"]
+      schema = {
+            columns = [
+                  {
+                     name = rowkey
+                     type = string
+                  },
+                  {
+                     name = "cf1:col1"
+                     type = boolean
+                  },
+                  {
+                     name = "cf1:col2"
+                     type = double
+                  },
+                  {
+                     name = "cf2:col1"
+                     type = bigint
+                  },
+                  {
+                     name = "cf2:col2"
+                     type = int
+                  }
+            ]
+      }
+      result_table_name = hbase_source
+  }
+}
+
+sink {
+  Assert {
+    source_table_name = hbase_source
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 10000
+        },
+        {
+          rule_type = MIN_ROW
+          rule_value = 10000
+        }
+      ],
+      field_rules = [
+        {
+          field_name = rowkey
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = "cf1:col1"
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = "cf1:col2"
+          field_type = double
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = "cf2:col1"
+          field_type = bigint
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = "cf2:col2"
+          field_type = int
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file

Reply via email to