This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 40bf6560f5 [Improve][Connector-V2][HBase] Support configurable range
scan boundary inclusion policies (#10011)
40bf6560f5 is described below
commit 40bf6560f556d368c5138dcc0a778fe9e8d15ad6
Author: Jast <[email protected]>
AuthorDate: Fri Nov 14 01:33:31 2025 +0800
[Improve][Connector-V2][HBase] Support configurable range scan boundary
inclusion policies (#10011)
---
docs/en/connector-v2/source/Hbase.md | 19 +++++++++++++++++++
docs/zh/connector-v2/source/Hbase.md | 19 +++++++++++++++++++
.../seatunnel/hbase/client/HbaseClient.java | 4 ++--
.../seatunnel/hbase/config/HbaseParameters.java | 12 ++++++++++++
.../seatunnel/hbase/config/HbaseSourceOptions.java | 14 ++++++++++++++
.../apache/seatunnel/e2e/connector/hbase/HbaseIT.java | 18 ++++++++++++++++++
....conf => hbase-source-with-default-inclusive.conf} | 8 ++++----
.../test/resources/hbase-source-with-end-rowkey.conf | 1 +
.../resources/hbase-source-with-rowkey-range.conf | 1 +
...onf => hbase-source-with-start-end-inclusive.conf} | 12 ++++++------
10 files changed, 96 insertions(+), 12 deletions(-)
diff --git a/docs/en/connector-v2/source/Hbase.md
b/docs/en/connector-v2/source/Hbase.md
index e96831b70e..2edd6eb289 100644
--- a/docs/en/connector-v2/source/Hbase.md
+++ b/docs/en/connector-v2/source/Hbase.md
@@ -31,6 +31,8 @@ Reads data from Apache Hbase.
| is_binary_rowkey | boolean | No | false |
| start_rowkey | string | No | - |
| end_rowkey | string | No | - |
+| start_row_inclusive | boolean | No | true |
+| end_row_inclusive | boolean | No | false |
| common-options | | No | - |
### zookeeper_quorum [string]
@@ -73,6 +75,23 @@ The start row of the scan
The stop row of the scan
+### start_row_inclusive
+
+Whether to include the start row in the scan range. When set to true, the
start row is included in the scan results. Default: true (inclusive).
+
+**Note:** In most cases, you should keep the default value (true). Only modify
this parameter if you have specific requirements for excluding the start row
from your scan results.
+
+### end_row_inclusive
+
+Whether to include the end row in the scan range. When set to false, the end
row is excluded from the scan results, following the left-closed-right-open
convention [start, end). Default: false (exclusive).
+
+**Note:** In most cases, you should keep the default value (false) which
follows HBase's standard left-closed-right-open convention. Only modify this
parameter if you need to include the end row in your scan results.
+
+**Important:** When using parallel reading with multiple splits, the
combination of these two parameters is critical for data integrity:
+- **Default (start_row_inclusive=true, end_row_inclusive=false)**: This is the
recommended configuration that ensures no data loss or duplication across
splits. Each split follows the [start, end) convention.
+- **Both false (start_row_inclusive=false, end_row_inclusive=false)**: This
may cause **data loss** at split boundaries, as the boundary rows will be
excluded from all splits.
+- **Both true (start_row_inclusive=true, end_row_inclusive=true)**: This may
cause **duplicate data** at split boundaries, as the boundary rows will be
included in multiple adjacent splits.
+
### common-options
Common parameters for Source plugins, refer to [Common Source
Options](../source-common-options.md).
diff --git a/docs/zh/connector-v2/source/Hbase.md
b/docs/zh/connector-v2/source/Hbase.md
index 2a0da09c1b..4d96ed9d5b 100644
--- a/docs/zh/connector-v2/source/Hbase.md
+++ b/docs/zh/connector-v2/source/Hbase.md
@@ -31,6 +31,8 @@ import ChangeLog from '../changelog/connector-hbase.md';
| is_binary_rowkey | boolean | 否 | false |
| start_rowkey | string | 否 | - |
| end_rowkey | string | 否 | - |
+| start_row_inclusive | boolean | 否 | true |
+| end_row_inclusive | boolean | 否 | false |
| common-options | | 否 | - |
### zookeeper_quorum [string]
@@ -73,6 +75,23 @@ HBase 的行键既可以是文本字符串,也可以是二进制数据。在 S
扫描结束行
+### start_row_inclusive
+
+设置扫描范围是否包含起始行。当设置为 true 时,扫描结果将包含起始行。默认值: true (包含)。
+
+**注意:** 在大多数情况下,应保持默认值 (true)。仅当您有特定需求需要排除起始行时才修改此参数。
+
+### end_row_inclusive
+
+设置扫描范围是否包含结束行。当设置为 false 时,扫描结果将不包含结束行,遵循左闭右开的区间约定 [start, end)。默认值: false
(不包含)。
+
+**注意:** 在大多数情况下,应保持默认值 (false),这遵循 HBase 标准的左闭右开区间约定。仅当您需要在扫描结果中包含结束行时才修改此参数。
+
+**重要提示:** 在使用多个 split 并行读取时,这两个参数的组合对数据完整性至关重要:
+- **默认配置 (start_row_inclusive=true, end_row_inclusive=false)**: 这是推荐的配置,可以确保跨
split 时不会丢失数据或产生重复数据。每个 split 遵循 [start, end) 左闭右开区间约定。
+- **都设置为 false (start_row_inclusive=false, end_row_inclusive=false)**:
这可能会导致**数据丢失**,因为边界行会被所有 split 排除在外。
+- **都设置为 true (start_row_inclusive=true, end_row_inclusive=true)**:
这可能会导致**数据重复**,因为边界行会被相邻的多个 split 重复包含。
+
### 常用选项
Source 插件常用参数,具体请参考 [Source 常用选项](../source-common-options.md)
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java
index 0b04d10ea2..3b169ae0e4 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java
@@ -368,8 +368,8 @@ public class HbaseClient {
HbaseSourceSplit split, HbaseParameters hbaseParameters,
List<String> columnNames)
throws IOException {
Scan scan = new Scan();
- scan.withStartRow(split.getStartRow(), true);
- scan.withStopRow(split.getEndRow(), true);
+ scan.withStartRow(split.getStartRow(),
hbaseParameters.isStartRowInclusive());
+ scan.withStopRow(split.getEndRow(),
hbaseParameters.isEndRowInclusive());
scan.setCacheBlocks(hbaseParameters.isCacheBlocks());
scan.setCaching(hbaseParameters.getCaching());
scan.setBatch(hbaseParameters.getBatch());
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
index dbb51a18a5..f334607953 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
@@ -75,6 +75,12 @@ public class HbaseParameters implements Serializable {
@Builder.Default
private HbaseSinkOptions.EnCoding enCoding =
HbaseSinkOptions.ENCODING.defaultValue();
+ @Builder.Default
+ private boolean startRowInclusive =
HbaseSourceOptions.START_ROW_INCLUSIVE.defaultValue();
+
+ @Builder.Default
+ private boolean endRowInclusive =
HbaseSourceOptions.END_ROW_INCLUSIVE.defaultValue();
+
public static HbaseParameters buildWithConfig(ReadonlyConfig config) {
HbaseParametersBuilder builder = HbaseParameters.builder();
String table = config.get(HbaseBaseOptions.TABLE);
@@ -143,6 +149,12 @@ public class HbaseParameters implements Serializable {
if
(pluginConfig.getOptional(HbaseSourceOptions.END_ROW_KEY).isPresent()) {
builder.endRowkey(pluginConfig.get(HbaseSourceOptions.END_ROW_KEY));
}
+ if
(pluginConfig.getOptional(HbaseSourceOptions.START_ROW_INCLUSIVE).isPresent()) {
+
builder.startRowInclusive(pluginConfig.get(HbaseSourceOptions.START_ROW_INCLUSIVE));
+ }
+ if
(pluginConfig.getOptional(HbaseSourceOptions.END_ROW_INCLUSIVE).isPresent()) {
+
builder.endRowInclusive(pluginConfig.get(HbaseSourceOptions.END_ROW_INCLUSIVE));
+ }
return builder.build();
}
}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSourceOptions.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSourceOptions.java
index 5094b2ad15..39ba3a7674 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSourceOptions.java
@@ -34,6 +34,20 @@ public class HbaseSourceOptions extends HbaseBaseOptions {
.noDefaultValue()
.withDescription("Hbase scan end rowkey");
+ public static final Option<Boolean> START_ROW_INCLUSIVE =
+ Options.key("start_row_inclusive")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Whether to include the start row in the scan.
Default is true (inclusive).");
+
+ public static final Option<Boolean> END_ROW_INCLUSIVE =
+ Options.key("end_row_inclusive")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to include the end row in the scan.
Default is false (exclusive), following the left-closed-right-open
convention.");
+
public static final Option<Boolean> IS_BINARY_ROW_KEY =
Options.key("is_binary_rowkey")
.booleanType()
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
index e0a49ed7a3..c874c36c3f 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
@@ -321,6 +321,24 @@ public class HbaseIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(0, sourceExecResult.getExitCode());
}
+ @TestTemplate
+ public void testHbaseSourceWithStartEndInclusive(TestContainer container)
+ throws IOException, InterruptedException {
+ fakeToHbaseArray(container);
+ Container.ExecResult sourceExecResult =
+
container.executeJob("/hbase-source-with-start-end-inclusive.conf");
+ Assertions.assertEquals(0, sourceExecResult.getExitCode());
+ }
+
+ @TestTemplate
+ public void testHbaseSourceWithDefaultInclusive(TestContainer container)
+ throws IOException, InterruptedException {
+ fakeToHbaseArray(container);
+ Container.ExecResult sourceExecResult =
+
container.executeJob("/hbase-source-with-default-inclusive.conf");
+ Assertions.assertEquals(0, sourceExecResult.getExitCode());
+ }
+
@TestTemplate
public void testCatalog(TestContainer container) {
// create exiting table
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-rowkey-range.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-default-inclusive.conf
similarity index 88%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-rowkey-range.conf
copy to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-default-inclusive.conf
index 3fea2048d4..cbeda4e60a 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-rowkey-range.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-default-inclusive.conf
@@ -15,14 +15,13 @@
# limitations under the License.
#
-
env {
parallelism = 1
job.mode = "BATCH"
}
source {
- Hbase {
+ Hbase {
zookeeper_quorum = "hbase_e2e:2181"
table = "seatunnel_test"
query_columns=["rowkey", "info:name", "info:score"]
@@ -30,8 +29,10 @@ source {
batch = 100
cache_blocks = false
is_binary_rowkey = false
- start_rowkey = "B"
+ start_rowkey = "A"
end_rowkey = "C"
+ # Test default values: start_row_inclusive = true (default),
end_row_inclusive = false (default)
+ # This should scan [A, C), which includes A and B, but excludes C
schema = {
columns = [
{
@@ -68,4 +69,3 @@ sink {
}
}
-
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-end-rowkey.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-end-rowkey.conf
index 8825f9f118..19d3226e92 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-end-rowkey.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-end-rowkey.conf
@@ -31,6 +31,7 @@ source {
cache_blocks = false
is_binary_rowkey = false
end_rowkey = "A"
+ end_row_inclusive = true
schema = {
columns = [
{
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-rowkey-range.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-rowkey-range.conf
index 3fea2048d4..71ff3833d8 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-rowkey-range.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-rowkey-range.conf
@@ -32,6 +32,7 @@ source {
is_binary_rowkey = false
start_rowkey = "B"
end_rowkey = "C"
+ end_row_inclusive = true
schema = {
columns = [
{
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-rowkey-range.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-start-end-inclusive.conf
similarity index 91%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-rowkey-range.conf
copy to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-start-end-inclusive.conf
index 3fea2048d4..a33c9e704c 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-rowkey-range.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-start-end-inclusive.conf
@@ -15,14 +15,13 @@
# limitations under the License.
#
-
env {
parallelism = 1
job.mode = "BATCH"
}
source {
- Hbase {
+ Hbase {
zookeeper_quorum = "hbase_e2e:2181"
table = "seatunnel_test"
query_columns=["rowkey", "info:name", "info:score"]
@@ -30,8 +29,10 @@ source {
batch = 100
cache_blocks = false
is_binary_rowkey = false
- start_rowkey = "B"
+ start_rowkey = "A"
end_rowkey = "C"
+ start_row_inclusive = true
+ end_row_inclusive = true
schema = {
columns = [
{
@@ -57,15 +58,14 @@ sink {
row_rules = [
{
rule_type = "MIN_ROW"
- rule_value = 2
+ rule_value = 3
},
{
rule_type = "MAX_ROW"
- rule_value = 2
+ rule_value = 3
}
]
}
}
}
-