This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 40bf6560f5 [Improve][Connector-V2][HBase] Support configurable range 
scan boundary inclusion policies (#10011)
40bf6560f5 is described below

commit 40bf6560f556d368c5138dcc0a778fe9e8d15ad6
Author: Jast <[email protected]>
AuthorDate: Fri Nov 14 01:33:31 2025 +0800

    [Improve][Connector-V2][HBase] Support configurable range scan boundary 
inclusion policies (#10011)
---
 docs/en/connector-v2/source/Hbase.md                  | 19 +++++++++++++++++++
 docs/zh/connector-v2/source/Hbase.md                  | 19 +++++++++++++++++++
 .../seatunnel/hbase/client/HbaseClient.java           |  4 ++--
 .../seatunnel/hbase/config/HbaseParameters.java       | 12 ++++++++++++
 .../seatunnel/hbase/config/HbaseSourceOptions.java    | 14 ++++++++++++++
 .../apache/seatunnel/e2e/connector/hbase/HbaseIT.java | 18 ++++++++++++++++++
 ....conf => hbase-source-with-default-inclusive.conf} |  8 ++++----
 .../test/resources/hbase-source-with-end-rowkey.conf  |  1 +
 .../resources/hbase-source-with-rowkey-range.conf     |  1 +
 ...onf => hbase-source-with-start-end-inclusive.conf} | 12 ++++++------
 10 files changed, 96 insertions(+), 12 deletions(-)

diff --git a/docs/en/connector-v2/source/Hbase.md 
b/docs/en/connector-v2/source/Hbase.md
index e96831b70e..2edd6eb289 100644
--- a/docs/en/connector-v2/source/Hbase.md
+++ b/docs/en/connector-v2/source/Hbase.md
@@ -31,6 +31,8 @@ Reads data from Apache Hbase.
 | is_binary_rowkey     | boolean   | No        | false   |
 | start_rowkey         | string    | No        | -       |
 | end_rowkey           | string    | No        | -       |
+| start_row_inclusive | boolean | No       | true    |
+| end_row_inclusive   | boolean | No       | false   |
 | common-options       |           | No        | -       |
 
 ### zookeeper_quorum [string]
@@ -73,6 +75,23 @@ The start row of the scan
 
 The stop row of the scan
 
+### start_row_inclusive
+
+Whether to include the start row in the scan range. When set to true, the 
start row is included in the scan results. Default: true (inclusive).
+
+**Note:** In most cases, you should keep the default value (true). Only modify 
this parameter if you have specific requirements for excluding the start row 
from your scan results.
+
+### end_row_inclusive
+
+Whether to include the end row in the scan range. When set to false, the end 
row is excluded from the scan results, following the left-closed-right-open 
convention [start, end). Default: false (exclusive).
+
+**Note:** In most cases, you should keep the default value (false) which 
follows HBase's standard left-closed-right-open convention. Only modify this 
parameter if you need to include the end row in your scan results.
+
+**Important:** When using parallel reading with multiple splits, the 
combination of these two parameters is critical for data integrity:
+- **Default (start_row_inclusive=true, end_row_inclusive=false)**: This is the 
recommended configuration that ensures no data loss or duplication across 
splits. Each split follows the [start, end) convention.
+- **Both false (start_row_inclusive=false, end_row_inclusive=false)**: This 
may cause **data loss** at split boundaries, as the boundary rows will be 
excluded from all splits.
+- **Both true (start_row_inclusive=true, end_row_inclusive=true)**: This may 
cause **duplicate data** at split boundaries, as the boundary rows will be 
included in multiple adjacent splits.
+
 ### common-options
 
 Common parameters for Source plugins, refer to [Common Source 
Options](../source-common-options.md).
diff --git a/docs/zh/connector-v2/source/Hbase.md 
b/docs/zh/connector-v2/source/Hbase.md
index 2a0da09c1b..4d96ed9d5b 100644
--- a/docs/zh/connector-v2/source/Hbase.md
+++ b/docs/zh/connector-v2/source/Hbase.md
@@ -31,6 +31,8 @@ import ChangeLog from '../changelog/connector-hbase.md';
 | is_binary_rowkey     | boolean  | 否  | false |
 | start_rowkey         | string   | 否  | -     |
 | end_rowkey           | string   | 否  | -     |
+| start_row_inclusive | boolean | 否  | true  |
+| end_row_inclusive   | boolean | 否  | false |
 | common-options       |          | 否  | -     |
 
 ### zookeeper_quorum [string]
@@ -73,6 +75,23 @@ HBase 的行键既可以是文本字符串,也可以是二进制数据。在 S
 
 扫描结束行
 
+### start_row_inclusive
+
+设置扫描范围是否包含起始行。当设置为 true 时,扫描结果将包含起始行。默认值: true (包含)。
+
+**注意:** 在大多数情况下,应保持默认值 (true)。仅当您有特定需求需要排除起始行时才修改此参数。
+
+### end_row_inclusive
+
+设置扫描范围是否包含结束行。当设置为 false 时,扫描结果将不包含结束行,遵循左闭右开的区间约定 [start, end)。默认值: false 
(不包含)。
+
+**注意:** 在大多数情况下,应保持默认值 (false),这遵循 HBase 标准的左闭右开区间约定。仅当您需要在扫描结果中包含结束行时才修改此参数。
+
+**重要提示:** 在使用多个 split 并行读取时,这两个参数的组合对数据完整性至关重要:
+- **默认配置 (start_row_inclusive=true, end_row_inclusive=false)**: 这是推荐的配置,可以确保跨 
split 时不会丢失数据或产生重复数据。每个 split 遵循 [start, end) 左闭右开区间约定。
+- **都设置为 false (start_row_inclusive=false, end_row_inclusive=false)**: 
这可能会导致**数据丢失**,因为边界行会被所有 split 排除在外。
+- **都设置为 true (start_row_inclusive=true, end_row_inclusive=true)**: 
这可能会导致**数据重复**,因为边界行会被相邻的多个 split 重复包含。
+
 ### 常用选项
 
 Source 插件常用参数,具体请参考 [Source 常用选项](../source-common-options.md)
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java
index 0b04d10ea2..3b169ae0e4 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java
@@ -368,8 +368,8 @@ public class HbaseClient {
             HbaseSourceSplit split, HbaseParameters hbaseParameters, 
List<String> columnNames)
             throws IOException {
         Scan scan = new Scan();
-        scan.withStartRow(split.getStartRow(), true);
-        scan.withStopRow(split.getEndRow(), true);
+        scan.withStartRow(split.getStartRow(), 
hbaseParameters.isStartRowInclusive());
+        scan.withStopRow(split.getEndRow(), 
hbaseParameters.isEndRowInclusive());
         scan.setCacheBlocks(hbaseParameters.isCacheBlocks());
         scan.setCaching(hbaseParameters.getCaching());
         scan.setBatch(hbaseParameters.getBatch());
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
index dbb51a18a5..f334607953 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
@@ -75,6 +75,12 @@ public class HbaseParameters implements Serializable {
     @Builder.Default
     private HbaseSinkOptions.EnCoding enCoding = 
HbaseSinkOptions.ENCODING.defaultValue();
 
+    @Builder.Default
+    private boolean startRowInclusive = 
HbaseSourceOptions.START_ROW_INCLUSIVE.defaultValue();
+
+    @Builder.Default
+    private boolean endRowInclusive = 
HbaseSourceOptions.END_ROW_INCLUSIVE.defaultValue();
+
     public static HbaseParameters buildWithConfig(ReadonlyConfig config) {
         HbaseParametersBuilder builder = HbaseParameters.builder();
         String table = config.get(HbaseBaseOptions.TABLE);
@@ -143,6 +149,12 @@ public class HbaseParameters implements Serializable {
         if 
(pluginConfig.getOptional(HbaseSourceOptions.END_ROW_KEY).isPresent()) {
             
builder.endRowkey(pluginConfig.get(HbaseSourceOptions.END_ROW_KEY));
         }
+        if 
(pluginConfig.getOptional(HbaseSourceOptions.START_ROW_INCLUSIVE).isPresent()) {
+            
builder.startRowInclusive(pluginConfig.get(HbaseSourceOptions.START_ROW_INCLUSIVE));
+        }
+        if 
(pluginConfig.getOptional(HbaseSourceOptions.END_ROW_INCLUSIVE).isPresent()) {
+            
builder.endRowInclusive(pluginConfig.get(HbaseSourceOptions.END_ROW_INCLUSIVE));
+        }
         return builder.build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSourceOptions.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSourceOptions.java
index 5094b2ad15..39ba3a7674 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSourceOptions.java
@@ -34,6 +34,20 @@ public class HbaseSourceOptions extends HbaseBaseOptions {
                     .noDefaultValue()
                     .withDescription("Hbase scan end rowkey");
 
+    public static final Option<Boolean> START_ROW_INCLUSIVE =
+            Options.key("start_row_inclusive")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Whether to include the start row in the scan. 
Default is true (inclusive).");
+
+    public static final Option<Boolean> END_ROW_INCLUSIVE =
+            Options.key("end_row_inclusive")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to include the end row in the scan. 
Default is false (exclusive), following the left-closed-right-open 
convention.");
+
     public static final Option<Boolean> IS_BINARY_ROW_KEY =
             Options.key("is_binary_rowkey")
                     .booleanType()
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
index e0a49ed7a3..c874c36c3f 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
@@ -321,6 +321,24 @@ public class HbaseIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(0, sourceExecResult.getExitCode());
     }
 
+    @TestTemplate
+    public void testHbaseSourceWithStartEndInclusive(TestContainer container)
+            throws IOException, InterruptedException {
+        fakeToHbaseArray(container);
+        Container.ExecResult sourceExecResult =
+                
container.executeJob("/hbase-source-with-start-end-inclusive.conf");
+        Assertions.assertEquals(0, sourceExecResult.getExitCode());
+    }
+
+    @TestTemplate
+    public void testHbaseSourceWithDefaultInclusive(TestContainer container)
+            throws IOException, InterruptedException {
+        fakeToHbaseArray(container);
+        Container.ExecResult sourceExecResult =
+                
container.executeJob("/hbase-source-with-default-inclusive.conf");
+        Assertions.assertEquals(0, sourceExecResult.getExitCode());
+    }
+
     @TestTemplate
     public void testCatalog(TestContainer container) {
         // create exiting table
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-rowkey-range.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-default-inclusive.conf
similarity index 88%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-rowkey-range.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-default-inclusive.conf
index 3fea2048d4..cbeda4e60a 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-rowkey-range.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-default-inclusive.conf
@@ -15,14 +15,13 @@
 # limitations under the License.
 #
 
-
 env {
   parallelism = 1
   job.mode = "BATCH"
 }
 
 source {
-    Hbase {
+  Hbase {
       zookeeper_quorum = "hbase_e2e:2181"
       table = "seatunnel_test"
       query_columns=["rowkey", "info:name", "info:score"]
@@ -30,8 +29,10 @@ source {
       batch = 100
       cache_blocks = false
       is_binary_rowkey = false
-      start_rowkey = "B"
+      start_rowkey = "A"
       end_rowkey = "C"
+      # Test default values: start_row_inclusive = true (default), 
end_row_inclusive = false (default)
+      # This should scan [A, C), which includes A and B, but excludes C
       schema = {
         columns = [
           {
@@ -68,4 +69,3 @@ sink {
   }
 }
 
-
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-end-rowkey.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-end-rowkey.conf
index 8825f9f118..19d3226e92 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-end-rowkey.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-end-rowkey.conf
@@ -31,6 +31,7 @@ source {
       cache_blocks = false
       is_binary_rowkey = false
       end_rowkey = "A"
+      end_row_inclusive = true
       schema = {
         columns = [
           {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-rowkey-range.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-rowkey-range.conf
index 3fea2048d4..71ff3833d8 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-rowkey-range.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-rowkey-range.conf
@@ -32,6 +32,7 @@ source {
       is_binary_rowkey = false
       start_rowkey = "B"
       end_rowkey = "C"
+      end_row_inclusive = true
       schema = {
         columns = [
           {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-rowkey-range.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-start-end-inclusive.conf
similarity index 91%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-rowkey-range.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-start-end-inclusive.conf
index 3fea2048d4..a33c9e704c 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-rowkey-range.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-start-end-inclusive.conf
@@ -15,14 +15,13 @@
 # limitations under the License.
 #
 
-
 env {
   parallelism = 1
   job.mode = "BATCH"
 }
 
 source {
-    Hbase {
+  Hbase {
       zookeeper_quorum = "hbase_e2e:2181"
       table = "seatunnel_test"
       query_columns=["rowkey", "info:name", "info:score"]
@@ -30,8 +29,10 @@ source {
       batch = 100
       cache_blocks = false
       is_binary_rowkey = false
-      start_rowkey = "B"
+      start_rowkey = "A"
       end_rowkey = "C"
+      start_row_inclusive = true
+      end_row_inclusive = true
       schema = {
         columns = [
           {
@@ -57,15 +58,14 @@ sink {
       row_rules = [
         {
           rule_type = "MIN_ROW"
-          rule_value = 2
+          rule_value = 3
         },
         {
           rule_type = "MAX_ROW"
-          rule_value = 2
+          rule_value = 3
         }
       ]
     }
   }
 }
 
-

Reply via email to