This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d7b8f37b41 [Feature][Connector-V2] Support row range boundaries for
HBaseSource (#9983)
d7b8f37b41 is described below
commit d7b8f37b41945eb413822032ef598169b6d14414
Author: xiaochen <[email protected]>
AuthorDate: Fri Oct 31 18:16:55 2025 +0800
[Feature][Connector-V2] Support row range boundaries for HBaseSource (#9983)
---
docs/en/connector-v2/source/Hbase.md | 38 ++-
docs/zh/connector-v2/source/Hbase.md | 38 ++-
.../seatunnel/hbase/config/HbaseBaseOptions.java | 10 +-
.../seatunnel/hbase/config/HbaseParameters.java | 34 +-
.../seatunnel/hbase/config/HbaseSinkOptions.java | 46 +--
.../seatunnel/hbase/config/HbaseSourceOptions.java | 50 ++-
.../hbase/source/HbaseSourceSplitEnumerator.java | 77 ++++-
.../connectors/seatunnel/hbase/util/HBaseUtil.java | 47 +++
.../source/HbaseSourceSplitEnumeratorTest.java | 379 +++++++++++++++++++++
.../seatunnel/e2e/connector/hbase/HbaseIT.java | 43 +++
.../resources/hbase-source-with-end-rowkey.conf | 70 ++++
.../resources/hbase-source-with-rowkey-range.conf | 71 ++++
.../resources/hbase-source-with-start-rowkey.conf | 70 ++++
13 files changed, 892 insertions(+), 81 deletions(-)
diff --git a/docs/en/connector-v2/source/Hbase.md
b/docs/en/connector-v2/source/Hbase.md
index bbce3e0a25..e96831b70e 100644
--- a/docs/en/connector-v2/source/Hbase.md
+++ b/docs/en/connector-v2/source/Hbase.md
@@ -19,16 +19,19 @@ Reads data from Apache Hbase.
## Options
-| Name | Type | Required | Default |
-|--------------------|---------|----------|---------|
-| zookeeper_quorum | string | Yes | - |
-| table | string | Yes | - |
-| schema | config | Yes | - |
-| hbase_extra_config | string | No | - |
-| caching | int | No | -1 |
-| batch | int | No | -1 |
-| cache_blocks | boolean | No | false |
-| common-options | | No | - |
+| Name | Type | Required | Default |
+|----------------------|-----------|-----------|---------|
+| zookeeper_quorum | string | Yes | - |
+| table | string | Yes | - |
+| schema | config | Yes | - |
+| hbase_extra_config | string | No | - |
+| caching | int | No | -1 |
+| batch | int | No | -1 |
+| cache_blocks | boolean | No | false |
+| is_binary_rowkey | boolean | No | false |
+| start_rowkey | string | No | - |
+| end_rowkey | string | No | - |
+| common-options | | No | - |
### zookeeper_quorum [string]
@@ -58,6 +61,18 @@ The batch parameter sets the maximum number of columns
returned per scan. This i
The cache_blocks parameter determines whether to cache data blocks during
scans. By default, HBase caches data blocks during scans. Setting this to false
reduces memory usage during scans. Default in SeaTunnel: false.
+### is_binary_rowkey
+
+The row key in HBase can be either a text string or binary data. In SeaTunnel,
the row key is set to a text string by default (i.e., the default value of
is_binary_rowkey is false).
+
+### start_rowkey
+
+The start row of the scan
+
+### end_rowkey
+
+The stop row of the scan
+
### common-options
Common parameters for Source plugins, refer to [Common Source
Options](../source-common-options.md).
@@ -72,6 +87,9 @@ source {
caching = 1000
batch = 100
cache_blocks = false
+ is_binary_rowkey = false
+ start_rowkey = "B"
+ end_rowkey = "C"
schema = {
columns = [
{
diff --git a/docs/zh/connector-v2/source/Hbase.md
b/docs/zh/connector-v2/source/Hbase.md
index 8745eb642d..2a0da09c1b 100644
--- a/docs/zh/connector-v2/source/Hbase.md
+++ b/docs/zh/connector-v2/source/Hbase.md
@@ -19,16 +19,19 @@ import ChangeLog from '../changelog/connector-hbase.md';
## 选项
-| 名称 | 类型 | 必填 | 默认值 |
-|--------------------|---------|----|-------|
-| zookeeper_quorum | string | 是 | - |
-| table | string | 是 | - |
-| schema | config | 是 | - |
-| hbase_extra_config | string | 否 | - |
-| caching | int | 否 | -1 |
-| batch | int | 否 | -1 |
-| cache_blocks | boolean | 否 | false |
-| common-options | | 否 | - |
+| 名称 | 类型 | 必填 | 默认值 |
+|----------------------|----------|----|-------|
+| zookeeper_quorum | string | 是 | - |
+| table | string | 是 | - |
+| schema | config | 是 | - |
+| hbase_extra_config | string | 否 | - |
+| caching | int | 否 | -1 |
+| batch | int | 否 | -1 |
+| cache_blocks | boolean | 否 | false |
+| is_binary_rowkey | boolean | 否 | false |
+| start_rowkey | string | 否 | - |
+| end_rowkey | string | 否 | - |
+| common-options | | 否 | - |
### zookeeper_quorum [string]
@@ -58,6 +61,18 @@ batch 参数用于设置在扫描过程中每次返回的最大列数。这对
cache_blocks 参数用于设置在扫描过程中是否缓存数据块。默认情况下,HBase 会在扫描时将数据块缓存到块缓存中。如果设置为
false,则在扫描过程中不会缓存数据块,从而减少内存的使用。在SeaTunnel中默认值为: false
+### is_binary_rowkey
+
+HBase 的行键既可以是文本字符串,也可以是二进制数据。在 SeaTunnel 中,行键默认设置为文本字符串(即 is_binary_rowkey
默认值为 false)
+
+### start_rowkey
+
+扫描起始行
+
+### end_rowkey
+
+扫描结束行
+
### 常用选项
Source 插件常用参数,具体请参考 [Source 常用选项](../source-common-options.md)
@@ -72,6 +87,9 @@ source {
caching = 1000
batch = 100
cache_blocks = false
+ is_binary_rowkey = false
+ start_rowkey = "B"
+ end_rowkey = "C"
schema = {
columns = [
{
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseBaseOptions.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseBaseOptions.java
index 46bb9cb0f6..60d346aac4 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseBaseOptions.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseBaseOptions.java
@@ -19,10 +19,12 @@ package
org.apache.seatunnel.connectors.seatunnel.hbase.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.options.ConnectorCommonOptions;
import java.util.List;
+import java.util.Map;
-public class HbaseBaseOptions {
+public class HbaseBaseOptions extends ConnectorCommonOptions {
public static final Option<String> ZOOKEEPER_QUORUM =
Options.key("zookeeper_quorum")
@@ -38,4 +40,10 @@ public class HbaseBaseOptions {
.listType()
.noDefaultValue()
.withDescription("Hbase rowkey column");
+
+ public static final Option<Map<String, String>> HBASE_EXTRA_CONFIG =
+ Options.key("hbase_extra_config")
+ .mapType()
+ .noDefaultValue()
+ .withDescription("Hbase extra config");
}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
index 35d9fbfcbe..dbb51a18a5 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
@@ -40,20 +40,26 @@ public class HbaseParameters implements Serializable {
private List<String> columns;
+ private boolean isBinaryRowkey;
+
+ private String startRowkey;
+
+ private String endRowkey;
+
private Map<String, String> familyNames;
private String versionColumn;
private Map<String, String> hbaseExtraConfig;
- @Builder.Default private int caching =
HbaseSinkOptions.HBASE_CACHING_CONFIG.defaultValue();
+ @Builder.Default private int caching =
HbaseSourceOptions.HBASE_CACHING_CONFIG.defaultValue();
- @Builder.Default private int batch =
HbaseSinkOptions.HBASE_BATCH_CONFIG.defaultValue();
+ @Builder.Default private int batch =
HbaseSourceOptions.HBASE_BATCH_CONFIG.defaultValue();
@Builder.Default private Long ttl =
HbaseSinkOptions.HBASE_TTL_CONFIG.defaultValue();
@Builder.Default
- private boolean cacheBlocks =
HbaseSinkOptions.HBASE_CACHE_BLOCKS_CONFIG.defaultValue();
+ private boolean cacheBlocks =
HbaseSourceOptions.HBASE_CACHE_BLOCKS_CONFIG.defaultValue();
@Builder.Default
private String rowkeyDelimiter =
HbaseSinkOptions.ROWKEY_DELIMITER.defaultValue();
@@ -118,14 +124,24 @@ public class HbaseParameters implements Serializable {
if
(pluginConfig.getOptional(HbaseSinkOptions.HBASE_EXTRA_CONFIG).isPresent()) {
builder.hbaseExtraConfig(pluginConfig.get(HbaseSinkOptions.HBASE_EXTRA_CONFIG));
}
- if
(pluginConfig.getOptional(HbaseSinkOptions.HBASE_CACHING_CONFIG).isPresent()) {
-
builder.caching(pluginConfig.get(HbaseSinkOptions.HBASE_CACHING_CONFIG));
+ if
(pluginConfig.getOptional(HbaseSourceOptions.HBASE_CACHING_CONFIG).isPresent())
{
+
builder.caching(pluginConfig.get(HbaseSourceOptions.HBASE_CACHING_CONFIG));
+ }
+ if
(pluginConfig.getOptional(HbaseSourceOptions.HBASE_BATCH_CONFIG).isPresent()) {
+
builder.batch(pluginConfig.get(HbaseSourceOptions.HBASE_BATCH_CONFIG));
+ }
+ if
(pluginConfig.getOptional(HbaseSourceOptions.HBASE_CACHE_BLOCKS_CONFIG).isPresent())
{
+
builder.cacheBlocks(pluginConfig.get(HbaseSourceOptions.HBASE_CACHE_BLOCKS_CONFIG));
+ }
+
+ if
(pluginConfig.getOptional(HbaseSourceOptions.IS_BINARY_ROW_KEY).isPresent()) {
+
builder.isBinaryRowkey(pluginConfig.get(HbaseSourceOptions.IS_BINARY_ROW_KEY));
}
- if
(pluginConfig.getOptional(HbaseSinkOptions.HBASE_BATCH_CONFIG).isPresent()) {
-
builder.batch(pluginConfig.get(HbaseSinkOptions.HBASE_BATCH_CONFIG));
+ if
(pluginConfig.getOptional(HbaseSourceOptions.START_ROW_KEY).isPresent()) {
+
builder.startRowkey(pluginConfig.get(HbaseSourceOptions.START_ROW_KEY));
}
- if
(pluginConfig.getOptional(HbaseSinkOptions.HBASE_CACHE_BLOCKS_CONFIG).isPresent())
{
-
builder.cacheBlocks(pluginConfig.get(HbaseSinkOptions.HBASE_CACHE_BLOCKS_CONFIG));
+ if
(pluginConfig.getOptional(HbaseSourceOptions.END_ROW_KEY).isPresent()) {
+
builder.endRowkey(pluginConfig.get(HbaseSourceOptions.END_ROW_KEY));
}
return builder.build();
}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSinkOptions.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSinkOptions.java
index 7a520ee5ff..fd31beadea 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSinkOptions.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSinkOptions.java
@@ -23,7 +23,6 @@ import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
import java.util.Arrays;
-import java.util.List;
import java.util.Map;
import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
@@ -32,14 +31,14 @@ import static
org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
public class HbaseSinkOptions extends HbaseBaseOptions {
- public static final Option<List<String>> ROWKEY_COLUMNS =
- Options.key("rowkey_column")
- .listType()
- .noDefaultValue()
- .withDescription("Hbase rowkey column");
-
private static final Integer DEFAULT_BUFFER_SIZE = 8 * 1024 * 1024;
+ public static final Option<Map<String, String>> FAMILY_NAME =
+ Options.key("family_name")
+ .mapType()
+ .noDefaultValue()
+ .withDescription("Hbase column family name");
+
public static final Option<String> ROWKEY_DELIMITER =
Options.key("rowkey_delimiter")
.stringType()
@@ -77,18 +76,6 @@ public class HbaseSinkOptions extends HbaseBaseOptions {
.defaultValue(EnCoding.UTF8)
.withDescription("Hbase record encoding");
- public static final Option<Map<String, String>> FAMILY_NAME =
- Options.key("family_name")
- .mapType()
- .noDefaultValue()
- .withDescription("Hbase column family name");
-
- public static final Option<Map<String, String>> HBASE_EXTRA_CONFIG =
- Options.key("hbase_extra_config")
- .mapType()
- .noDefaultValue()
- .withDescription("Hbase extra config");
-
public static final Option<Long> HBASE_TTL_CONFIG =
Options.key("ttl")
.longType()
@@ -96,27 +83,6 @@ public class HbaseSinkOptions extends HbaseBaseOptions {
.withDescription(
"The expiration time configuration for writing
hbase data. The default value is -1, indicating no expiration time.");
- public static final Option<Boolean> HBASE_CACHE_BLOCKS_CONFIG =
- Options.key("cache_blocks")
- .booleanType()
- .defaultValue(false)
- .withDescription(
- "When it is false, data blocks are not cached.
When it is true, data blocks are cached. This value should be set to false when
scanning a large amount of data to reduce memory consumption. The default value
is false");
-
- public static final Option<Integer> HBASE_CACHING_CONFIG =
- Options.key("caching")
- .intType()
- .defaultValue(-1)
- .withDescription(
- "Set the number of rows read from the server each
time can reduce the number of round trips between the client and the server,
thereby improving performance. The default value is -1.");
-
- public static final Option<Integer> HBASE_BATCH_CONFIG =
- Options.key("batch")
- .intType()
- .defaultValue(-1)
- .withDescription(
- "Set the batch size to control the maximum number
of cells returned each time, thereby controlling the amount of data returned by
a single RPC call. The default value is -1.");
-
public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
Options.key("schema_save_mode")
.enumType(SchemaSaveMode.class)
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSourceOptions.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSourceOptions.java
index e1f151054d..5094b2ad15 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSourceOptions.java
@@ -17,4 +17,52 @@
package org.apache.seatunnel.connectors.seatunnel.hbase.config;
-public class HbaseSourceOptions extends HbaseBaseOptions {}
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class HbaseSourceOptions extends HbaseBaseOptions {
+
+ public static final Option<String> START_ROW_KEY =
+ Options.key("start_rowkey")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Hbase scan start rowkey");
+
+ public static final Option<String> END_ROW_KEY =
+ Options.key("end_rowkey")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Hbase scan end rowkey");
+
+ public static final Option<Boolean> IS_BINARY_ROW_KEY =
+ Options.key("is_binary_rowkey")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("is binary rowkey");
+
+ public static final Option<Boolean> HBASE_CACHE_BLOCKS_CONFIG =
+ Options.key("cache_blocks")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "When it is false, data blocks are not cached. "
+ + "When it is true, data blocks are
cached. "
+ + "This value should be set to false when
scanning a large amount of data to reduce memory consumption. "
+ + "The default value is false");
+
+ public static final Option<Integer> HBASE_CACHING_CONFIG =
+ Options.key("caching")
+ .intType()
+ .defaultValue(-1)
+ .withDescription(
+ "Set the number of rows read from the server each
time can reduce the number of round trips between the client and the server, "
+ + "thereby improving performance. The
default value is -1.");
+
+ public static final Option<Integer> HBASE_BATCH_CONFIG =
+ Options.key("batch")
+ .intType()
+ .defaultValue(-1)
+ .withDescription(
+ "Set the batch size to control the maximum number
of cells returned each time, "
+ + "thereby controlling the amount of data
returned by a single RPC call. The default value is -1.");
+}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumerator.java
index f5508c9037..73b1d6862a 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumerator.java
@@ -18,11 +18,16 @@
package org.apache.seatunnel.connectors.seatunnel.hbase.source;
+import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
+
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.hbase.client.HbaseClient;
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import org.apache.seatunnel.connectors.seatunnel.hbase.util.HBaseUtil;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.util.Bytes;
import lombok.extern.slf4j.Slf4j;
@@ -61,6 +66,15 @@ public class HbaseSourceSplitEnumerator
this(context, hbaseParameters, sourceState.getAssignedSplits());
}
+ @VisibleForTesting
+ public HbaseSourceSplitEnumerator(
+ Context<HbaseSourceSplit> context,
+ HbaseParameters hbaseParameters,
+ HbaseClient hbaseClient) {
+ this(context, hbaseParameters, new HashSet<>());
+ this.hbaseClient = hbaseClient;
+ }
+
private HbaseSourceSplitEnumerator(
Context<HbaseSourceSplit> context,
HbaseParameters hbaseParameters,
@@ -83,7 +97,13 @@ public class HbaseSourceSplitEnumerator
@Override
public void close() throws IOException {
- // do nothing
+ if (this.hbaseClient != null) {
+ try {
+ this.hbaseClient.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
}
@Override
@@ -152,23 +172,60 @@ public class HbaseSourceSplitEnumerator
context.signalNoMoreSplits(taskId);
}
- /** Get all splits of table */
- private Set<HbaseSourceSplit> getTableSplits() {
- List<HbaseSourceSplit> splits = new ArrayList<>();
+ @VisibleForTesting
+ public Set<HbaseSourceSplit> getTableSplits() {
try {
RegionLocator regionLocator =
hbaseClient.getRegionLocator(hbaseParameters.getTable());
byte[][] startKeys = regionLocator.getStartKeys();
byte[][] endKeys = regionLocator.getEndKeys();
- if (startKeys.length != endKeys.length) {
- throw new IOException(
- "Failed to create Splits for HBase table {}. HBase
start keys and end keys not equal."
- + hbaseParameters.getTable());
- }
+ List<HbaseSourceSplit> splits = new ArrayList<>();
+ boolean isBinaryRowkey = hbaseParameters.isBinaryRowkey();
+ byte[] userStartRowkey =
+ HBaseUtil.convertRowKey(hbaseParameters.getStartRowkey(),
isBinaryRowkey);
+ byte[] userEndRowkey =
+ HBaseUtil.convertRowKey(hbaseParameters.getEndRowkey(),
isBinaryRowkey);
+ HBaseUtil.validateRowKeyRange(userStartRowkey, userEndRowkey);
int i = 0;
while (i < startKeys.length) {
- splits.add(new HbaseSourceSplit(i, startKeys[i], endKeys[i]));
+ byte[] regionStartKey = startKeys[i];
+ byte[] regionEndKey = endKeys[i];
+ if (userEndRowkey.length > 0
+ && Bytes.compareTo(userEndRowkey, regionStartKey) <= 0
+ && Bytes.compareTo(regionStartKey,
HConstants.EMPTY_BYTE_ARRAY) != 0) {
+ i++;
+ continue;
+ }
+
+ if (userStartRowkey.length > 0
+ && Bytes.compareTo(userStartRowkey, regionEndKey) >= 0
+ && Bytes.compareTo(regionEndKey,
HConstants.EMPTY_BYTE_ARRAY) != 0) {
+ i++;
+ continue;
+ }
+ byte[] splitStartKey =
+ userStartRowkey.length > 0
+ && (Bytes.compareTo(
+ regionStartKey,
+
HConstants.EMPTY_BYTE_ARRAY)
+ == 0
+ ||
Bytes.compareTo(userStartRowkey, regionStartKey)
+ > 0)
+ ? userStartRowkey
+ : regionStartKey;
+
+ byte[] splitEndKey =
+ userEndRowkey.length > 0
+ && (Bytes.compareTo(
+ regionEndKey,
+
HConstants.EMPTY_BYTE_ARRAY)
+ == 0
+ ||
Bytes.compareTo(userEndRowkey, regionEndKey) < 0)
+ ? userEndRowkey
+ : regionEndKey;
+
+ splits.add(new HbaseSourceSplit(i, splitStartKey,
splitEndKey));
i++;
}
return new HashSet<>(splits);
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/util/HBaseUtil.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/util/HBaseUtil.java
new file mode 100644
index 0000000000..fb5507ff15
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/util/HBaseUtil.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.util;
+
+import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class HBaseUtil {
+
+ public static byte[] convertRowKey(String rowKey, boolean isBinary) {
+ if (StringUtils.isEmpty(rowKey)) {
+ return HConstants.EMPTY_BYTE_ARRAY;
+ }
+
+ if (isBinary) {
+ return Bytes.toBytesBinary(rowKey);
+ } else {
+ return Bytes.toBytes(rowKey);
+ }
+ }
+
+ public static void validateRowKeyRange(byte[] startRowKey, byte[]
endRowKey) {
+ if (startRowKey.length > 0 && endRowKey.length > 0) {
+ if (Bytes.compareTo(startRowKey, endRowKey) > 0) {
+ throw new IllegalArgumentException("startRowkey can't be
bigger than endRowkey");
+ }
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumeratorTest.java
b/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumeratorTest.java
new file mode 100644
index 0000000000..fd5eb0cceb
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumeratorTest.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.source;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.connectors.seatunnel.hbase.client.HbaseClient;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.IOException;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+public class HbaseSourceSplitEnumeratorTest {
+
+ @Mock private SourceSplitEnumerator.Context<HbaseSourceSplit> context;
+
+ @Mock private HbaseClient hbaseClient;
+
+ @Mock private RegionLocator regionLocator;
+
+ @Mock private HbaseParameters hbaseParameters;
+
+ private HbaseSourceSplitEnumerator enumerator;
+
+ @BeforeEach
+ void setUp() throws IOException {
+ MockitoAnnotations.openMocks(this);
+
+ when(hbaseParameters.getTable()).thenReturn("test_table");
+
when(hbaseParameters.getZookeeperQuorum()).thenReturn("127.0.0.1:2801");
+ when(hbaseParameters.isBinaryRowkey()).thenReturn(false);
+ when(hbaseParameters.getStartRowkey()).thenReturn("");
+ when(hbaseParameters.getEndRowkey()).thenReturn("");
+ enumerator = new HbaseSourceSplitEnumerator(context, hbaseParameters,
hbaseClient);
+
when(hbaseClient.getRegionLocator("test_table")).thenReturn(regionLocator);
+ }
+
+ @Test
+ void testGetTableSplitsWithSingleRegion() throws IOException {
+ byte[][] startKeys = {HConstants.EMPTY_BYTE_ARRAY};
+ byte[][] endKeys = {HConstants.EMPTY_BYTE_ARRAY};
+
+ when(regionLocator.getStartKeys()).thenReturn(startKeys);
+ when(regionLocator.getEndKeys()).thenReturn(endKeys);
+
+ Set<HbaseSourceSplit> splits = enumerator.getTableSplits();
+
+ assertNotNull(splits);
+ assertEquals(1, splits.size());
+
+ HbaseSourceSplit split = splits.iterator().next();
+ assertEquals("hbase_source_split_0", split.splitId());
+ assertArrayEquals(HConstants.EMPTY_BYTE_ARRAY, split.getStartRow());
+ assertArrayEquals(HConstants.EMPTY_BYTE_ARRAY, split.getEndRow());
+ }
+
+ @Test
+ void testGetTableSplitsWithUserDefinedRowKeyRange() throws IOException {
+ // Simulate a table with 4 regions but user only wants data from
"row100" to "row300"
+ byte[][] startKeys = {
+ HConstants.EMPTY_BYTE_ARRAY,
+ Bytes.toBytes("row050"),
+ Bytes.toBytes("row200"),
+ Bytes.toBytes("row400")
+ };
+ byte[][] endKeys = {
+ Bytes.toBytes("row050"),
+ Bytes.toBytes("row200"),
+ Bytes.toBytes("row400"),
+ HConstants.EMPTY_BYTE_ARRAY
+ };
+
+ when(regionLocator.getStartKeys()).thenReturn(startKeys);
+ when(regionLocator.getEndKeys()).thenReturn(endKeys);
+ when(hbaseParameters.getStartRowkey()).thenReturn("row100");
+ when(hbaseParameters.getEndRowkey()).thenReturn("row300");
+
+ Set<HbaseSourceSplit> splits = enumerator.getTableSplits();
+
+ assertNotNull(splits);
+ assertEquals(2, splits.size()); // Should only include regions 1 and 2
+
+ // Verify the splits contain the correct row key ranges
+ boolean foundRegion1Split = false, foundRegion2Split = false;
+ for (HbaseSourceSplit split : splits) {
+ if ("hbase_source_split_1".equals(split.splitId())) {
+ foundRegion1Split = true;
+ // Start should be user's start key (row100), end should be
region end (row200)
+ assertArrayEquals(Bytes.toBytes("row100"),
split.getStartRow());
+ assertArrayEquals(Bytes.toBytes("row200"), split.getEndRow());
+ } else if ("hbase_source_split_2".equals(split.splitId())) {
+ foundRegion2Split = true;
+ // Start should be region start (row200), end should be user's
end key (row300)
+ assertArrayEquals(Bytes.toBytes("row200"),
split.getStartRow());
+ assertArrayEquals(Bytes.toBytes("row300"), split.getEndRow());
+ }
+ }
+
+ assertTrue(foundRegion1Split && foundRegion2Split);
+ }
+
+ @Test
+ void testGetTableSplitsWithBinaryRowKey() throws IOException {
+ byte[][] startKeys = {HConstants.EMPTY_BYTE_ARRAY, new byte[] {0x01,
0x02, 0x03}};
+ byte[][] endKeys = {new byte[] {0x01, 0x02, 0x03},
HConstants.EMPTY_BYTE_ARRAY};
+
+ when(regionLocator.getStartKeys()).thenReturn(startKeys);
+ when(regionLocator.getEndKeys()).thenReturn(endKeys);
+ when(hbaseParameters.isBinaryRowkey()).thenReturn(true);
+ when(hbaseParameters.getStartRowkey()).thenReturn("\\x01\\x01\\x01");
+ when(hbaseParameters.getEndRowkey()).thenReturn("\\x02\\x02\\x02");
+
+ Set<HbaseSourceSplit> splits = enumerator.getTableSplits();
+
+ assertNotNull(splits);
+ assertEquals(2, splits.size());
+ }
+
+ @Test
+ void testNoMatchingRegionsOfUserEndRowkeyLtRegionStartKey() throws
IOException {
+ byte[][] startKeys = {
+ HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("row200"),
Bytes.toBytes("row400")
+ };
+ byte[][] endKeys = {
+ Bytes.toBytes("row200"), Bytes.toBytes("row400"),
HConstants.EMPTY_BYTE_ARRAY
+ };
+
+ when(regionLocator.getStartKeys()).thenReturn(startKeys);
+ when(regionLocator.getEndKeys()).thenReturn(endKeys);
+ when(hbaseParameters.getStartRowkey()).thenReturn("row10");
+ when(hbaseParameters.getEndRowkey()).thenReturn("row15");
+
+ Set<HbaseSourceSplit> splits = enumerator.getTableSplits();
+
+ assertNotNull(splits);
+ assertEquals(1, splits.size()); // Should include the first region
+
+ HbaseSourceSplit split = splits.iterator().next();
+ assertEquals("hbase_source_split_0", split.splitId());
+ assertArrayEquals(Bytes.toBytes("row10"), split.getStartRow());
+ assertArrayEquals(Bytes.toBytes("row15"), split.getEndRow());
+ }
+
+ @Test
+ void testNoMatchingRegionsOfUserStartRowkeyGtRegionEndKey() throws
IOException {
+ byte[][] startKeys = {
+ HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("row200"),
Bytes.toBytes("row400")
+ };
+ byte[][] endKeys = {
+ Bytes.toBytes("row200"), Bytes.toBytes("row400"),
HConstants.EMPTY_BYTE_ARRAY
+ };
+
+ when(regionLocator.getStartKeys()).thenReturn(startKeys);
+ when(regionLocator.getEndKeys()).thenReturn(endKeys);
+ when(hbaseParameters.getStartRowkey()).thenReturn("row500");
+ when(hbaseParameters.getEndRowkey()).thenReturn("row600");
+
+ Set<HbaseSourceSplit> splits = enumerator.getTableSplits();
+
+ assertNotNull(splits);
+ assertEquals(1, splits.size()); // Should include the last region
+
+ HbaseSourceSplit split = splits.iterator().next();
+ assertEquals("hbase_source_split_2", split.splitId());
+ assertArrayEquals(Bytes.toBytes("row500"), split.getStartRow());
+ assertArrayEquals(Bytes.toBytes("row600"), split.getEndRow());
+ }
+
+ @Test
+ void testGetTableSplitsWithOnlyStartRowKey() throws IOException {
+ byte[][] startKeys = {
+ HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("row100"),
Bytes.toBytes("row200")
+ };
+ byte[][] endKeys = {
+ Bytes.toBytes("row100"), Bytes.toBytes("row200"),
HConstants.EMPTY_BYTE_ARRAY
+ };
+
+ when(regionLocator.getStartKeys()).thenReturn(startKeys);
+ when(regionLocator.getEndKeys()).thenReturn(endKeys);
+ when(hbaseParameters.getStartRowkey()).thenReturn("row150");
+
+ Set<HbaseSourceSplit> splits = enumerator.getTableSplits();
+
+ assertNotNull(splits);
+ assertEquals(2, splits.size()); // Should include regions 1 and 2
+
+ boolean foundRegion1Split = false, foundRegion2Split = false;
+ for (HbaseSourceSplit split : splits) {
+ if ("hbase_source_split_1".equals(split.splitId())) {
+ foundRegion1Split = true;
+ assertArrayEquals(Bytes.toBytes("row150"),
split.getStartRow());
+ assertArrayEquals(Bytes.toBytes("row200"), split.getEndRow());
+ } else if ("hbase_source_split_2".equals(split.splitId())) {
+ foundRegion2Split = true;
+ assertArrayEquals(Bytes.toBytes("row200"),
split.getStartRow());
+ assertArrayEquals(HConstants.EMPTY_BYTE_ARRAY,
split.getEndRow());
+ }
+ }
+
+ assertTrue(foundRegion1Split && foundRegion2Split);
+ }
+
+ @Test
+ void testGetTableSplitsWithOnlyEndRowKey() throws IOException {
+ // Test with only end row key specified
+ byte[][] startKeys = {
+ HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("row100"),
Bytes.toBytes("row200")
+ };
+ byte[][] endKeys = {
+ Bytes.toBytes("row100"), Bytes.toBytes("row200"),
HConstants.EMPTY_BYTE_ARRAY
+ };
+
+ when(regionLocator.getStartKeys()).thenReturn(startKeys);
+ when(regionLocator.getEndKeys()).thenReturn(endKeys);
+ when(hbaseParameters.getEndRowkey()).thenReturn("row150");
+
+ Set<HbaseSourceSplit> splits = enumerator.getTableSplits();
+
+ assertNotNull(splits);
+ assertEquals(2, splits.size()); // Should include regions 0 and 1
+
+ boolean foundRegion0Split = false, foundRegion1Split = false;
+ for (HbaseSourceSplit split : splits) {
+ if ("hbase_source_split_0".equals(split.splitId())) {
+ foundRegion0Split = true;
+ assertArrayEquals(HConstants.EMPTY_BYTE_ARRAY,
split.getStartRow());
+ assertArrayEquals(Bytes.toBytes("row100"), split.getEndRow());
+ } else if ("hbase_source_split_1".equals(split.splitId())) {
+ foundRegion1Split = true;
+ assertArrayEquals(Bytes.toBytes("row100"),
split.getStartRow());
+ assertArrayEquals(Bytes.toBytes("row150"), split.getEndRow());
+ }
+ }
+
+ assertTrue(foundRegion0Split && foundRegion1Split);
+ }
+
+ @Test
+ void testGetTableSplitsWithExactStartRowKeyMatch() throws IOException {
+ byte[][] startKeys = {
+ HConstants.EMPTY_BYTE_ARRAY,
+ Bytes.toBytes("row100"),
+ Bytes.toBytes("row200"),
+ Bytes.toBytes("row300")
+ };
+ byte[][] endKeys = {
+ Bytes.toBytes("row100"),
+ Bytes.toBytes("row200"),
+ Bytes.toBytes("row300"),
+ HConstants.EMPTY_BYTE_ARRAY
+ };
+
+ when(regionLocator.getStartKeys()).thenReturn(startKeys);
+ when(regionLocator.getEndKeys()).thenReturn(endKeys);
+ when(hbaseParameters.getStartRowkey()).thenReturn("row100");
+
+ Set<HbaseSourceSplit> splits = enumerator.getTableSplits();
+
+ assertNotNull(splits);
+ assertEquals(3, splits.size());
+
+ boolean foundRegion1Split = false, foundRegion2Split = false,
foundRegion3Split = false;
+ for (HbaseSourceSplit split : splits) {
+ if ("hbase_source_split_1".equals(split.splitId())) {
+ foundRegion1Split = true;
+ assertArrayEquals(Bytes.toBytes("row100"),
split.getStartRow());
+ assertArrayEquals(Bytes.toBytes("row200"), split.getEndRow());
+ } else if ("hbase_source_split_2".equals(split.splitId())) {
+ foundRegion2Split = true;
+ assertArrayEquals(Bytes.toBytes("row200"),
split.getStartRow());
+ assertArrayEquals(Bytes.toBytes("row300"), split.getEndRow());
+ } else if ("hbase_source_split_3".equals(split.splitId())) {
+ foundRegion3Split = true;
+ assertArrayEquals(Bytes.toBytes("row300"),
split.getStartRow());
+ assertArrayEquals(HConstants.EMPTY_BYTE_ARRAY,
split.getEndRow());
+ }
+ }
+ assertTrue(foundRegion1Split && foundRegion2Split &&
foundRegion3Split);
+ }
+
+ @Test
+ void testGetTableSplitsWithExactEndRowKeyMatch() throws IOException {
+ byte[][] startKeys = {
+ HConstants.EMPTY_BYTE_ARRAY,
+ Bytes.toBytes("row100"),
+ Bytes.toBytes("row200"),
+ Bytes.toBytes("row300")
+ };
+ byte[][] endKeys = {
+ Bytes.toBytes("row100"),
+ Bytes.toBytes("row200"),
+ Bytes.toBytes("row300"),
+ HConstants.EMPTY_BYTE_ARRAY
+ };
+
+ when(regionLocator.getStartKeys()).thenReturn(startKeys);
+ when(regionLocator.getEndKeys()).thenReturn(endKeys);
+ when(hbaseParameters.getEndRowkey()).thenReturn("row200");
+
+ Set<HbaseSourceSplit> splits = enumerator.getTableSplits();
+
+ assertNotNull(splits);
+ assertEquals(2, splits.size());
+
+ boolean foundRegion0Split = false, foundRegion1Split = false;
+ for (HbaseSourceSplit split : splits) {
+ if ("hbase_source_split_0".equals(split.splitId())) {
+ foundRegion0Split = true;
+ assertArrayEquals(HConstants.EMPTY_BYTE_ARRAY,
split.getStartRow());
+ assertArrayEquals(Bytes.toBytes("row100"), split.getEndRow());
+ } else if ("hbase_source_split_1".equals(split.splitId())) {
+ foundRegion1Split = true;
+ assertArrayEquals(Bytes.toBytes("row100"),
split.getStartRow());
+ assertArrayEquals(Bytes.toBytes("row200"), split.getEndRow());
+ }
+ }
+ assertTrue(foundRegion0Split && foundRegion1Split);
+ }
+
+ @Test
+ void testGetTableSplitsWithExactRowKeyMatch() throws IOException {
+ byte[][] startKeys = {
+ HConstants.EMPTY_BYTE_ARRAY,
+ Bytes.toBytes("row100"),
+ Bytes.toBytes("row200"),
+ Bytes.toBytes("row300")
+ };
+ byte[][] endKeys = {
+ Bytes.toBytes("row100"),
+ Bytes.toBytes("row200"),
+ Bytes.toBytes("row300"),
+ HConstants.EMPTY_BYTE_ARRAY
+ };
+
+ when(regionLocator.getStartKeys()).thenReturn(startKeys);
+ when(regionLocator.getEndKeys()).thenReturn(endKeys);
+ when(hbaseParameters.getStartRowkey()).thenReturn("row100");
+ when(hbaseParameters.getEndRowkey()).thenReturn("row200");
+
+ Set<HbaseSourceSplit> splits = enumerator.getTableSplits();
+
+ assertNotNull(splits);
+ assertEquals(1, splits.size());
+
+ HbaseSourceSplit split = splits.iterator().next();
+ assertEquals("hbase_source_split_1", split.splitId());
+ assertArrayEquals(Bytes.toBytes("row100"), split.getStartRow());
+ assertArrayEquals(Bytes.toBytes("row200"), split.getEndRow());
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
index 48f3e48eee..e0a49ed7a3 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
@@ -338,6 +338,33 @@ public class HbaseIT extends TestSuiteBase implements
TestResource {
() -> catalog.dropTable(TablePath.of("", "", "tmp"), false));
}
+ @TestTemplate
+ public void testHbaseSourceWithStartRowKey(TestContainer container)
+ throws IOException, InterruptedException {
+ fakeToHbaseArray(container);
+ Container.ExecResult sourceExecResult =
+ container.executeJob("/hbase-source-with-start-rowkey.conf");
+ Assertions.assertEquals(0, sourceExecResult.getExitCode());
+ }
+
+ @TestTemplate
+ public void testHbaseSourceWithEndRowKey(TestContainer container)
+ throws IOException, InterruptedException {
+ fakeToHbaseArray(container);
+ Container.ExecResult sourceExecResult =
+ container.executeJob("/hbase-source-with-end-rowkey.conf");
+ Assertions.assertEquals(0, sourceExecResult.getExitCode());
+ }
+
+ @TestTemplate
+ public void testHbaseSourceWithRowKeyRange(TestContainer container)
+ throws IOException, InterruptedException {
+ fakeToHbaseArray(container);
+ Container.ExecResult sourceExecResult =
+ container.executeJob("/hbase-source-with-rowkey-range.conf");
+ Assertions.assertEquals(0, sourceExecResult.getExitCode());
+ }
+
private void fakeToHbase(TestContainer container) throws IOException,
InterruptedException {
deleteData(table);
Container.ExecResult sinkExecResult =
container.executeJob("/fake-to-hbase.conf");
@@ -385,6 +412,22 @@ public class HbaseIT extends TestSuiteBase implements
TestResource {
}
}
+ private void fakeToHbaseArray(TestContainer container)
+ throws IOException, InterruptedException {
+ deleteData(table);
+ Container.ExecResult sinkExecResult =
container.executeJob("/fake-to-hbase-array.conf");
+ Assertions.assertEquals(0, sinkExecResult.getExitCode());
+ Table hbaseTable = hbaseConnection.getTable(table);
+ Scan scan = new Scan();
+ ResultScanner scanner = hbaseTable.getScanner(scan);
+ ArrayList<Result> results = new ArrayList<>();
+ for (Result result : scanner) {
+ results.add(result);
+ }
+ Assertions.assertEquals(results.size(), 3);
+ scanner.close();
+ }
+
private int countData(TableName table) throws IOException {
Table hbaseTable = hbaseConnection.getTable(table);
Scan scan = new Scan();
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-end-rowkey.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-end-rowkey.conf
new file mode 100644
index 0000000000..8825f9f118
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-end-rowkey.conf
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Hbase {
+ zookeeper_quorum = "hbase_e2e:2181"
+ table = "seatunnel_test"
+ query_columns=["rowkey", "info:name", "info:score"]
+ caching = 1000
+ batch = 100
+ cache_blocks = false
+ is_binary_rowkey = false
+ end_rowkey = "A"
+ schema = {
+ columns = [
+ {
+ name = rowkey
+ type = string
+ },
+ {
+ name = "info:name"
+ type = string
+ },
+ {
+ name = "info:score"
+ type = int
+ }
+ ]
+ }
+ }
+}
+
+sink {
+ Assert {
+ rules = {
+ row_rules = [
+ {
+ rule_type = "MIN_ROW"
+ rule_value = 1
+ },
+ {
+ rule_type = "MAX_ROW"
+ rule_value = 1
+ }
+ ]
+ }
+ }
+}
+
+
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-rowkey-range.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-rowkey-range.conf
new file mode 100644
index 0000000000..3fea2048d4
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-rowkey-range.conf
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Hbase {
+ zookeeper_quorum = "hbase_e2e:2181"
+ table = "seatunnel_test"
+ query_columns=["rowkey", "info:name", "info:score"]
+ caching = 1000
+ batch = 100
+ cache_blocks = false
+ is_binary_rowkey = false
+ start_rowkey = "B"
+ end_rowkey = "C"
+ schema = {
+ columns = [
+ {
+ name = rowkey
+ type = string
+ },
+ {
+ name = "info:name"
+ type = string
+ },
+ {
+ name = "info:score"
+ type = int
+ }
+ ]
+ }
+ }
+}
+
+sink {
+ Assert {
+ rules = {
+ row_rules = [
+ {
+ rule_type = "MIN_ROW"
+ rule_value = 2
+ },
+ {
+ rule_type = "MAX_ROW"
+ rule_value = 2
+ }
+ ]
+ }
+ }
+}
+
+
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-start-rowkey.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-start-rowkey.conf
new file mode 100644
index 0000000000..b01585651e
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-start-rowkey.conf
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Hbase {
+ zookeeper_quorum = "hbase_e2e:2181"
+ table = "seatunnel_test"
+ query_columns=["rowkey", "info:name", "info:score"]
+ caching = 1000
+ batch = 100
+ cache_blocks = false
+ is_binary_rowkey = false
+ start_rowkey = "B"
+ schema = {
+ columns = [
+ {
+ name = rowkey
+ type = string
+ },
+ {
+ name = "info:name"
+ type = string
+ },
+ {
+ name = "info:score"
+ type = int
+ }
+ ]
+ }
+ }
+}
+
+sink {
+ Assert {
+ rules = {
+ row_rules = [
+ {
+ rule_type = "MIN_ROW"
+ rule_value = 2
+ },
+ {
+ rule_type = "MAX_ROW"
+ rule_value = 2
+ }
+ ]
+ }
+ }
+}
+
+