This is an automated email from the ASF dual-hosted git repository.

zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d722474bc3 [Fix] [connector-hbase] Fix namespace handling for HBase 
source (#10295)
d722474bc3 is described below

commit d722474bc36ce52735fff1b81af5527ef0bd44ca
Author: yzeng1618 <[email protected]>
AuthorDate: Wed Jan 28 10:08:56 2026 +0800

    [Fix] [connector-hbase] Fix namespace handling for HBase source (#10295)
    
    Co-authored-by: zengyi <[email protected]>
---
 docs/en/connectors/sink/Hbase.md                   |   1 +
 docs/en/connectors/source/Hbase.md                 |   1 +
 docs/zh/connectors/sink/Hbase.md                   |   1 +
 docs/zh/connectors/source/Hbase.md                 |   1 +
 .../seatunnel/hbase/client/HbaseClient.java        |  13 +-
 .../seatunnel/hbase/config/HbaseParameters.java    |  14 +-
 .../hbase/source/HbaseSourceSplitEnumerator.java   | 150 +++++++++++++--------
 .../hbase/config/HbaseParametersTest.java          |  95 +++++++++++++
 .../source/HbaseSourceSplitEnumeratorTest.java     | 146 +++++++++++++++-----
 .../seatunnel/e2e/connector/hbase/HbaseIT.java     |  24 ++++
 .../resources/hbase-source-with-namespace.conf     |  73 ++++++++++
 11 files changed, 430 insertions(+), 89 deletions(-)

diff --git a/docs/en/connectors/sink/Hbase.md b/docs/en/connectors/sink/Hbase.md
index 9d4d40e60e..104fde4955 100644
--- a/docs/en/connectors/sink/Hbase.md
+++ b/docs/en/connectors/sink/Hbase.md
@@ -37,6 +37,7 @@ The zookeeper cluster host of hbase, example: 
"hadoop001:2181,hadoop002:2181,had
 ### table [string]
 
 The table name you want to write, example: "seatunnel"
+If your table is under a custom namespace, use `namespace:table` (for example, 
`ns1:seatunnel_test`); if omitted, SeaTunnel will write to HBase's default 
namespace (`default`).
 
 ### rowkey_column [list]
 
diff --git a/docs/en/connectors/source/Hbase.md 
b/docs/en/connectors/source/Hbase.md
index dae2873292..8c837b670e 100644
--- a/docs/en/connectors/source/Hbase.md
+++ b/docs/en/connectors/source/Hbase.md
@@ -44,6 +44,7 @@ The zookeeper quorum for Hbase cluster hosts, e.g., 
"hadoop001:2181,hadoop002:21
 ### table [string]
 
 The name of the table to write to, e.g., "seatunnel".
+If your table lives in a custom namespace, use the `namespace:table` form (for 
example, `ns1:seatunnel_test`); when the namespace is omitted SeaTunnel will 
read from HBase's default namespace (`default`).
 
 ### schema [config]
 
diff --git a/docs/zh/connectors/sink/Hbase.md b/docs/zh/connectors/sink/Hbase.md
index fc4c75e9c0..f42916c5e0 100644
--- a/docs/zh/connectors/sink/Hbase.md
+++ b/docs/zh/connectors/sink/Hbase.md
@@ -37,6 +37,7 @@ hbase的zookeeper集群主机, 示例: 
"hadoop001:2181,hadoop002:2181,hadoop003:
 ### table [string]
 
 要写入的表名, 例如: "seatunnel"
+如果表在自定义 namespace 下,请使用 `namespace:table` 形式(如 `ns1:seatunnel_test`);未填写 
namespace 时,SeaTunnel 会写入到 HBase 默认命名空间 `default`。
 
 ### rowkey_column [list]
 
diff --git a/docs/zh/connectors/source/Hbase.md 
b/docs/zh/connectors/source/Hbase.md
index aabfc37380..b115e42dc8 100644
--- a/docs/zh/connectors/source/Hbase.md
+++ b/docs/zh/connectors/source/Hbase.md
@@ -44,6 +44,7 @@ hbase的zookeeper集群主机,例如:“hadoop001:2181,hadoop002:2181,hadoop
 ### table [string]
 
 要写入的表名,例如:“seatunnel”
+如果表在自定义 namespace 下,请使用 `namespace:table` 形式(如 `ns1:seatunnel_test`);未填写 
namespace 时,SeaTunnel 会使用 HBase 的默认命名空间 `default`。
 
 ### schema [config]
 
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java
index 70c393f799..64a5cbb102 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java
@@ -368,7 +368,9 @@ public class HbaseClient {
             throws IOException {
         Scan scan = buildScan(split, hbaseParameters, columnNames);
         return this.connection
-                .getTable(TableName.valueOf(hbaseParameters.getTable()))
+                .getTable(
+                        TableName.valueOf(
+                                hbaseParameters.getNamespace(), 
hbaseParameters.getTable()))
                 .getScanner(scan);
     }
 
@@ -416,11 +418,18 @@ public class HbaseClient {
     /**
      * Get a RegionLocator.
      *
-     * @param tableName table name
+     * @param tableName table name (preferably fully qualified as {@code 
namespace:table})
      * @return RegionLocator
      * @throws IOException exception
+     * @deprecated Use {@link #getRegionLocator(String, String)} instead to 
avoid relying on the
+     *     default namespace behavior.
      */
+    @Deprecated
     public RegionLocator getRegionLocator(String tableName) throws IOException 
{
         return this.connection.getRegionLocator(TableName.valueOf(tableName));
     }
+
+    public RegionLocator getRegionLocator(String namespace, String tableName) 
throws IOException {
+        return this.connection.getRegionLocator(TableName.valueOf(namespace, 
tableName));
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
index a6e6a04e77..7068c4a6e5 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
@@ -19,6 +19,8 @@ package 
org.apache.seatunnel.connectors.seatunnel.hbase.config;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+
 import lombok.Builder;
 import lombok.Getter;
 
@@ -30,6 +32,8 @@ import java.util.Map;
 @Getter
 public class HbaseParameters implements Serializable {
 
+    public static final String DEFAULT_NAMESPACE = 
NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR;
+
     private String zookeeperQuorum;
 
     private String namespace;
@@ -95,7 +99,7 @@ public class HbaseParameters implements Serializable {
             builder.table(table.substring(colonIndex + 1));
         } else {
             builder.table(table);
-            builder.namespace("default");
+            builder.namespace(DEFAULT_NAMESPACE);
         }
 
         // required parameters
@@ -129,6 +133,7 @@ public class HbaseParameters implements Serializable {
             builder.table(table.substring(colonIndex + 1));
         } else {
             builder.table(table);
+            builder.namespace(DEFAULT_NAMESPACE);
         }
 
         if 
(pluginConfig.getOptional(HbaseSinkOptions.HBASE_EXTRA_CONFIG).isPresent()) {
@@ -168,4 +173,11 @@ public class HbaseParameters implements Serializable {
         }
         return builder.build();
     }
+
+    public String getNamespace() {
+        if (namespace == null || namespace.trim().isEmpty()) {
+            return DEFAULT_NAMESPACE;
+        }
+        return namespace;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumerator.java
index 54306ef6ec..744130240a 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumerator.java
@@ -23,9 +23,12 @@ import 
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTestin
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.connectors.seatunnel.hbase.client.HbaseClient;
 import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import 
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.hbase.util.HBaseUtil;
 
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.util.Bytes;
 
@@ -59,14 +62,14 @@ public class HbaseSourceSplitEnumerator
 
     public HbaseSourceSplitEnumerator(
             Context<HbaseSourceSplit> context, HbaseParameters 
hbaseParameters) {
-        this(context, hbaseParameters, new HashSet<>());
+        this(context, hbaseParameters, new HashSet<>(), null);
     }
 
     public HbaseSourceSplitEnumerator(
             Context<HbaseSourceSplit> context,
             HbaseParameters hbaseParameters,
             HbaseSourceState sourceState) {
-        this(context, hbaseParameters, sourceState.getAssignedSplits());
+        this(context, hbaseParameters, sourceState.getAssignedSplits(), null);
     }
 
     @VisibleForTesting
@@ -90,7 +93,7 @@ public class HbaseSourceSplitEnumerator
             Context<HbaseSourceSplit> context,
             HbaseParameters hbaseParameters,
             Set<HbaseSourceSplit> assignedSplit) {
-        this(context, hbaseParameters, assignedSplit, 
HbaseClient.createInstance(hbaseParameters));
+        this(context, hbaseParameters, assignedSplit, null);
     }
 
     private HbaseSourceSplitEnumerator(
@@ -216,64 +219,103 @@ public class HbaseSourceSplitEnumerator
 
     @VisibleForTesting
     public Set<HbaseSourceSplit> getTableSplits() {
-
+        String namespace = hbaseParameters.getNamespace();
+        TableName tableName = TableName.valueOf(namespace, 
hbaseParameters.getTable());
         try {
-            RegionLocator regionLocator = 
hbaseClient.getRegionLocator(hbaseParameters.getTable());
-            byte[][] startKeys = regionLocator.getStartKeys();
-            byte[][] endKeys = regionLocator.getEndKeys();
-            List<HbaseSourceSplit> splits = new ArrayList<>();
-            boolean isBinaryRowkey = hbaseParameters.isBinaryRowkey();
-            byte[] userStartRowkey =
-                    HBaseUtil.convertRowKey(hbaseParameters.getStartRowkey(), 
isBinaryRowkey);
-            byte[] userEndRowkey =
-                    HBaseUtil.convertRowKey(hbaseParameters.getEndRowkey(), 
isBinaryRowkey);
-            HBaseUtil.validateRowKeyRange(userStartRowkey, userEndRowkey);
-
-            int i = 0;
-            while (i < startKeys.length) {
-                byte[] regionStartKey = startKeys[i];
-                byte[] regionEndKey = endKeys[i];
-                if (userEndRowkey.length > 0
-                        && Bytes.compareTo(userEndRowkey, regionStartKey) <= 0
-                        && Bytes.compareTo(regionStartKey, 
HConstants.EMPTY_BYTE_ARRAY) != 0) {
-                    i++;
-                    continue;
-                }
+            HbaseClient hbaseClient = getHbaseClient();
+            log.info("Enumerating HBase source splits for table [{}]", 
tableName.getNameAsString());
+            if (!hbaseClient.tableExists(tableName.getNameAsString())) {
+                String errorMsg =
+                        String.format(
+                                "HBase table [%s] does not exist", 
tableName.getNameAsString());
+                log.error(errorMsg);
+                throw new HbaseConnectorException(
+                        HbaseConnectorErrorCode.TABLE_QUERY_EXCEPTION, 
errorMsg);
+            }
 
-                if (userStartRowkey.length > 0
-                        && Bytes.compareTo(userStartRowkey, regionEndKey) >= 0
-                        && Bytes.compareTo(regionEndKey, 
HConstants.EMPTY_BYTE_ARRAY) != 0) {
+            try (RegionLocator regionLocator =
+                    hbaseClient.getRegionLocator(namespace, 
hbaseParameters.getTable())) {
+                byte[][] startKeys = regionLocator.getStartKeys();
+                byte[][] endKeys = regionLocator.getEndKeys();
+                if (startKeys.length == 0 || endKeys.length == 0) {
+                    String errorMsg =
+                            String.format(
+                                    "No region information found for HBase 
table [%s], please check whether the table exists "
+                                            + "and current user has permission 
to access it",
+                                    tableName.getNameAsString());
+                    log.error(errorMsg);
+                    throw new HbaseConnectorException(
+                            HbaseConnectorErrorCode.TABLE_QUERY_EXCEPTION, 
errorMsg);
+                }
+                List<HbaseSourceSplit> splits = new ArrayList<>();
+                boolean isBinaryRowkey = hbaseParameters.isBinaryRowkey();
+                byte[] userStartRowkey =
+                        
HBaseUtil.convertRowKey(hbaseParameters.getStartRowkey(), isBinaryRowkey);
+                byte[] userEndRowkey =
+                        
HBaseUtil.convertRowKey(hbaseParameters.getEndRowkey(), isBinaryRowkey);
+                HBaseUtil.validateRowKeyRange(userStartRowkey, userEndRowkey);
+
+                int i = 0;
+                while (i < startKeys.length) {
+                    byte[] regionStartKey = startKeys[i];
+                    byte[] regionEndKey = endKeys[i];
+                    if (userEndRowkey.length > 0
+                            && Bytes.compareTo(userEndRowkey, regionStartKey) 
<= 0
+                            && Bytes.compareTo(regionStartKey, 
HConstants.EMPTY_BYTE_ARRAY) != 0) {
+                        i++;
+                        continue;
+                    }
+
+                    if (userStartRowkey.length > 0
+                            && Bytes.compareTo(userStartRowkey, regionEndKey) 
>= 0
+                            && Bytes.compareTo(regionEndKey, 
HConstants.EMPTY_BYTE_ARRAY) != 0) {
+                        i++;
+                        continue;
+                    }
+                    byte[] splitStartKey =
+                            userStartRowkey.length > 0
+                                            && (Bytes.compareTo(
+                                                                    
regionStartKey,
+                                                                    
HConstants.EMPTY_BYTE_ARRAY)
+                                                            == 0
+                                                    || Bytes.compareTo(
+                                                                    
userStartRowkey, regionStartKey)
+                                                            > 0)
+                                    ? userStartRowkey
+                                    : regionStartKey;
+
+                    byte[] splitEndKey =
+                            userEndRowkey.length > 0
+                                            && (Bytes.compareTo(
+                                                                    
regionEndKey,
+                                                                    
HConstants.EMPTY_BYTE_ARRAY)
+                                                            == 0
+                                                    || 
Bytes.compareTo(userEndRowkey, regionEndKey)
+                                                            < 0)
+                                    ? userEndRowkey
+                                    : regionEndKey;
+
+                    splits.add(new HbaseSourceSplit(i, splitStartKey, 
splitEndKey));
                     i++;
-                    continue;
                 }
-                byte[] splitStartKey =
-                        userStartRowkey.length > 0
-                                        && (Bytes.compareTo(
-                                                                regionStartKey,
-                                                                
HConstants.EMPTY_BYTE_ARRAY)
-                                                        == 0
-                                                || 
Bytes.compareTo(userStartRowkey, regionStartKey)
-                                                        > 0)
-                                ? userStartRowkey
-                                : regionStartKey;
-
-                byte[] splitEndKey =
-                        userEndRowkey.length > 0
-                                        && (Bytes.compareTo(
-                                                                regionEndKey,
-                                                                
HConstants.EMPTY_BYTE_ARRAY)
-                                                        == 0
-                                                || 
Bytes.compareTo(userEndRowkey, regionEndKey) < 0)
-                                ? userEndRowkey
-                                : regionEndKey;
-
-                splits.add(new HbaseSourceSplit(i, splitStartKey, 
splitEndKey));
-                i++;
+                return new HashSet<>(splits);
             }
-            return new HashSet<>(splits);
         } catch (IOException e) {
-            throw new RuntimeException(e);
+            String errorMsg =
+                    String.format(
+                            "Failed to enumerate splits for HBase table [%s]",
+                            tableName.getNameAsString());
+            log.error(errorMsg, e);
+            throw new HbaseConnectorException(
+                    HbaseConnectorErrorCode.TABLE_QUERY_EXCEPTION, errorMsg, 
e);
+        }
+    }
+
+    private synchronized HbaseClient getHbaseClient() {
+        if (hbaseClient == null) {
+            hbaseClient = HbaseClient.createInstance(hbaseParameters);
         }
+        return hbaseClient;
     }
 
     /** Hash algorithm for assigning splits to readers */
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParametersTest.java
 
b/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParametersTest.java
index 34cf23aa34..c0ce4912fe 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParametersTest.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParametersTest.java
@@ -28,6 +28,30 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class HbaseParametersTest {
 
+    @Test
+    void testBuildWithSourceConfigWithoutNamespace() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(HbaseBaseOptions.ZOOKEEPER_QUORUM.key(), 
"127.0.0.1:2181");
+        configMap.put(HbaseBaseOptions.TABLE.key(), "tbl");
+        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(configMap);
+
+        HbaseParameters parameters = 
HbaseParameters.buildWithSourceConfig(readonlyConfig);
+        assertEquals(HbaseParameters.DEFAULT_NAMESPACE, 
parameters.getNamespace());
+        assertEquals("tbl", parameters.getTable());
+    }
+
+    @Test
+    void testBuildWithSourceConfigWithNamespace() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(HbaseBaseOptions.ZOOKEEPER_QUORUM.key(), 
"127.0.0.1:2181");
+        configMap.put(HbaseBaseOptions.TABLE.key(), "test:tbl");
+        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(configMap);
+
+        HbaseParameters parameters = 
HbaseParameters.buildWithSourceConfig(readonlyConfig);
+        assertEquals("test", parameters.getNamespace());
+        assertEquals("tbl", parameters.getTable());
+    }
+
     @Test
     void testBuildWithSourceConfigReadsTimeRange() {
         Map<String, Object> config = new HashMap<>();
@@ -42,4 +66,75 @@ public class HbaseParametersTest {
         assertEquals(1000L, parameters.getStartTimestamp());
         assertEquals(2000L, parameters.getEndTimestamp());
     }
+
+    @Test
+    void testGetNamespaceReturnsDefaultWhenNull() {
+        HbaseParameters parameters =
+                HbaseParameters.builder()
+                        .namespace(null)
+                        .table("tbl")
+                        .zookeeperQuorum("127.0.0.1:2181")
+                        .build();
+        assertEquals(HbaseParameters.DEFAULT_NAMESPACE, 
parameters.getNamespace());
+    }
+
+    @Test
+    void testBuildWithSourceConfigWithLeadingColonUsesDefaultNamespace() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(HbaseBaseOptions.ZOOKEEPER_QUORUM.key(), 
"127.0.0.1:2181");
+        configMap.put(HbaseBaseOptions.TABLE.key(), ":tbl");
+
+        HbaseParameters parameters =
+                
HbaseParameters.buildWithSourceConfig(ReadonlyConfig.fromMap(configMap));
+        assertEquals(HbaseParameters.DEFAULT_NAMESPACE, 
parameters.getNamespace());
+        assertEquals("tbl", parameters.getTable());
+    }
+
+    @Test
+    void testBuildWithSourceConfigWithMultipleColons() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(HbaseBaseOptions.ZOOKEEPER_QUORUM.key(), 
"127.0.0.1:2181");
+        configMap.put(HbaseBaseOptions.TABLE.key(), "ns:tbl:extra");
+
+        HbaseParameters parameters =
+                
HbaseParameters.buildWithSourceConfig(ReadonlyConfig.fromMap(configMap));
+        assertEquals("ns", parameters.getNamespace());
+        assertEquals("tbl:extra", parameters.getTable());
+    }
+
+    @Test
+    void testBuildWithSourceConfigWithSpaces() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(HbaseBaseOptions.ZOOKEEPER_QUORUM.key(), 
"127.0.0.1:2181");
+        configMap.put(HbaseBaseOptions.TABLE.key(), " ns : tbl ");
+
+        HbaseParameters parameters =
+                
HbaseParameters.buildWithSourceConfig(ReadonlyConfig.fromMap(configMap));
+        assertEquals(" ns ", parameters.getNamespace());
+        assertEquals(" tbl ", parameters.getTable());
+    }
+
+    @Test
+    void testBuildWithSourceConfigWithEmptyTableName() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(HbaseBaseOptions.ZOOKEEPER_QUORUM.key(), 
"127.0.0.1:2181");
+        configMap.put(HbaseBaseOptions.TABLE.key(), "test:");
+
+        HbaseParameters parameters =
+                
HbaseParameters.buildWithSourceConfig(ReadonlyConfig.fromMap(configMap));
+        assertEquals("test", parameters.getNamespace());
+        assertEquals("", parameters.getTable());
+    }
+
+    @Test
+    void testBuildWithSourceConfigWithoutNamespaceKeepsSpacesInTableName() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(HbaseBaseOptions.ZOOKEEPER_QUORUM.key(), 
"127.0.0.1:2181");
+        configMap.put(HbaseBaseOptions.TABLE.key(), " tbl ");
+
+        HbaseParameters parameters =
+                
HbaseParameters.buildWithSourceConfig(ReadonlyConfig.fromMap(configMap));
+        assertEquals(HbaseParameters.DEFAULT_NAMESPACE, 
parameters.getNamespace());
+        assertEquals(" tbl ", parameters.getTable());
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumeratorTest.java
 
b/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumeratorTest.java
index 0fffeec0cc..66a6df90dd 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumeratorTest.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumeratorTest.java
@@ -20,6 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.hbase.source;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.connectors.seatunnel.hbase.client.HbaseClient;
 import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import 
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
 
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.RegionLocator;
@@ -43,7 +44,9 @@ import static 
org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -57,7 +60,7 @@ public class HbaseSourceSplitEnumeratorTest {
 
     @Mock private RegionLocator regionLocator;
 
-    @Mock private HbaseParameters hbaseParameters;
+    private HbaseParameters hbaseParameters;
 
     private HbaseSourceSplitEnumerator enumerator;
 
@@ -65,13 +68,23 @@ public class HbaseSourceSplitEnumeratorTest {
     void setUp() throws IOException {
         MockitoAnnotations.openMocks(this);
 
-        when(hbaseParameters.getTable()).thenReturn("test_table");
-        
when(hbaseParameters.getZookeeperQuorum()).thenReturn("127.0.0.1:2801");
-        when(hbaseParameters.isBinaryRowkey()).thenReturn(false);
-        when(hbaseParameters.getStartRowkey()).thenReturn("");
-        when(hbaseParameters.getEndRowkey()).thenReturn("");
+        hbaseParameters = createParameters(HbaseParameters.DEFAULT_NAMESPACE, 
false, "", "");
         enumerator = new HbaseSourceSplitEnumerator(context, hbaseParameters, 
hbaseClient);
-        
when(hbaseClient.getRegionLocator("test_table")).thenReturn(regionLocator);
+        when(hbaseClient.tableExists(anyString())).thenReturn(true);
+        when(hbaseClient.getRegionLocator(HbaseParameters.DEFAULT_NAMESPACE, 
"test_table"))
+                .thenReturn(regionLocator);
+    }
+
+    private HbaseParameters createParameters(
+            String namespace, boolean isBinaryRowkey, String startRowkey, 
String endRowkey) {
+        return HbaseParameters.builder()
+                .namespace(namespace)
+                .table("test_table")
+                .zookeeperQuorum("127.0.0.1:2801")
+                .isBinaryRowkey(isBinaryRowkey)
+                .startRowkey(startRowkey)
+                .endRowkey(endRowkey)
+                .build();
     }
 
     @Test
@@ -93,6 +106,49 @@ public class HbaseSourceSplitEnumeratorTest {
         assertArrayEquals(HConstants.EMPTY_BYTE_ARRAY, split.getEndRow());
     }
 
+    @Test
+    void testGetTableSplitsWithBlankNamespaceUsesDefault() throws IOException {
+        HbaseParameters blankNamespaceParameters = createParameters("", false, 
"", "");
+        byte[][] startKeys = {HConstants.EMPTY_BYTE_ARRAY};
+        byte[][] endKeys = {HConstants.EMPTY_BYTE_ARRAY};
+        when(regionLocator.getStartKeys()).thenReturn(startKeys);
+        when(regionLocator.getEndKeys()).thenReturn(endKeys);
+
+        HbaseSourceSplitEnumerator enumeratorWithBlankNamespace =
+                new HbaseSourceSplitEnumerator(context, 
blankNamespaceParameters, hbaseClient);
+        Set<HbaseSourceSplit> splits = 
enumeratorWithBlankNamespace.getTableSplits();
+
+        assertNotNull(splits);
+        assertEquals(1, splits.size());
+        verify(hbaseClient, times(1))
+                .getRegionLocator(HbaseParameters.DEFAULT_NAMESPACE, 
"test_table");
+    }
+
+    @Test
+    void testGetTableSplitsWithTableNotExists() {
+        when(hbaseClient.tableExists(anyString())).thenReturn(false);
+
+        assertThrows(HbaseConnectorException.class, () -> 
enumerator.getTableSplits());
+    }
+
+    @Test
+    void testGetTableSplitsWithNoRegionInfo() throws IOException {
+        when(regionLocator.getStartKeys()).thenReturn(new byte[0][]);
+        when(regionLocator.getEndKeys()).thenReturn(new byte[0][]);
+
+        assertThrows(HbaseConnectorException.class, () -> 
enumerator.getTableSplits());
+    }
+
+    @Test
+    void testGetTableSplitsWrapsIOExceptionAsHbaseConnectorException() throws 
IOException {
+        when(hbaseClient.getRegionLocator(HbaseParameters.DEFAULT_NAMESPACE, 
"test_table"))
+                .thenThrow(new IOException("region locator error"));
+
+        HbaseConnectorException exception =
+                assertThrows(HbaseConnectorException.class, () -> 
enumerator.getTableSplits());
+        assertTrue(exception.getCause() instanceof IOException);
+    }
+
     @Test
     void testGetTableSplitsWithUserDefinedRowKeyRange() throws IOException {
         // Simulate a table with 4 regions but user only wants data from 
"row100" to "row300"
@@ -111,10 +167,12 @@ public class HbaseSourceSplitEnumeratorTest {
 
         when(regionLocator.getStartKeys()).thenReturn(startKeys);
         when(regionLocator.getEndKeys()).thenReturn(endKeys);
-        when(hbaseParameters.getStartRowkey()).thenReturn("row100");
-        when(hbaseParameters.getEndRowkey()).thenReturn("row300");
+        HbaseParameters parametersWithRowkeyRange =
+                createParameters(HbaseParameters.DEFAULT_NAMESPACE, false, 
"row100", "row300");
+        HbaseSourceSplitEnumerator enumeratorWithRowkeyRange =
+                new HbaseSourceSplitEnumerator(context, 
parametersWithRowkeyRange, hbaseClient);
 
-        Set<HbaseSourceSplit> splits = enumerator.getTableSplits();
+        Set<HbaseSourceSplit> splits = 
enumeratorWithRowkeyRange.getTableSplits();
 
         assertNotNull(splits);
         assertEquals(2, splits.size()); // Should only include regions 1 and 2
@@ -145,11 +203,16 @@ public class HbaseSourceSplitEnumeratorTest {
 
         when(regionLocator.getStartKeys()).thenReturn(startKeys);
         when(regionLocator.getEndKeys()).thenReturn(endKeys);
-        when(hbaseParameters.isBinaryRowkey()).thenReturn(true);
-        when(hbaseParameters.getStartRowkey()).thenReturn("\\x01\\x01\\x01");
-        when(hbaseParameters.getEndRowkey()).thenReturn("\\x02\\x02\\x02");
+        HbaseParameters binaryRowkeyParameters =
+                createParameters(
+                        HbaseParameters.DEFAULT_NAMESPACE,
+                        true,
+                        "\\x01\\x01\\x01",
+                        "\\x02\\x02\\x02");
+        HbaseSourceSplitEnumerator enumeratorWithBinaryRowkey =
+                new HbaseSourceSplitEnumerator(context, 
binaryRowkeyParameters, hbaseClient);
 
-        Set<HbaseSourceSplit> splits = enumerator.getTableSplits();
+        Set<HbaseSourceSplit> splits = 
enumeratorWithBinaryRowkey.getTableSplits();
 
         assertNotNull(splits);
         assertEquals(2, splits.size());
@@ -166,10 +229,12 @@ public class HbaseSourceSplitEnumeratorTest {
 
         when(regionLocator.getStartKeys()).thenReturn(startKeys);
         when(regionLocator.getEndKeys()).thenReturn(endKeys);
-        when(hbaseParameters.getStartRowkey()).thenReturn("row10");
-        when(hbaseParameters.getEndRowkey()).thenReturn("row15");
+        HbaseParameters parametersWithRowkeyRange =
+                createParameters(HbaseParameters.DEFAULT_NAMESPACE, false, 
"row10", "row15");
+        HbaseSourceSplitEnumerator enumeratorWithRowkeyRange =
+                new HbaseSourceSplitEnumerator(context, 
parametersWithRowkeyRange, hbaseClient);
 
-        Set<HbaseSourceSplit> splits = enumerator.getTableSplits();
+        Set<HbaseSourceSplit> splits = 
enumeratorWithRowkeyRange.getTableSplits();
 
         assertNotNull(splits);
         assertEquals(1, splits.size()); // Should include the first region
@@ -191,10 +256,12 @@ public class HbaseSourceSplitEnumeratorTest {
 
         when(regionLocator.getStartKeys()).thenReturn(startKeys);
         when(regionLocator.getEndKeys()).thenReturn(endKeys);
-        when(hbaseParameters.getStartRowkey()).thenReturn("row500");
-        when(hbaseParameters.getEndRowkey()).thenReturn("row600");
+        HbaseParameters parametersWithRowkeyRange =
+                createParameters(HbaseParameters.DEFAULT_NAMESPACE, false, 
"row500", "row600");
+        HbaseSourceSplitEnumerator enumeratorWithRowkeyRange =
+                new HbaseSourceSplitEnumerator(context, 
parametersWithRowkeyRange, hbaseClient);
 
-        Set<HbaseSourceSplit> splits = enumerator.getTableSplits();
+        Set<HbaseSourceSplit> splits = 
enumeratorWithRowkeyRange.getTableSplits();
 
         assertNotNull(splits);
         assertEquals(1, splits.size()); // Should include the last region
@@ -216,9 +283,12 @@ public class HbaseSourceSplitEnumeratorTest {
 
         when(regionLocator.getStartKeys()).thenReturn(startKeys);
         when(regionLocator.getEndKeys()).thenReturn(endKeys);
-        when(hbaseParameters.getStartRowkey()).thenReturn("row150");
+        HbaseParameters parametersWithStartRowkey =
+                createParameters(HbaseParameters.DEFAULT_NAMESPACE, false, 
"row150", "");
+        HbaseSourceSplitEnumerator enumeratorWithStartRowkey =
+                new HbaseSourceSplitEnumerator(context, 
parametersWithStartRowkey, hbaseClient);
 
-        Set<HbaseSourceSplit> splits = enumerator.getTableSplits();
+        Set<HbaseSourceSplit> splits = 
enumeratorWithStartRowkey.getTableSplits();
 
         assertNotNull(splits);
         assertEquals(2, splits.size()); // Should include regions 1 and 2
@@ -251,9 +321,12 @@ public class HbaseSourceSplitEnumeratorTest {
 
         when(regionLocator.getStartKeys()).thenReturn(startKeys);
         when(regionLocator.getEndKeys()).thenReturn(endKeys);
-        when(hbaseParameters.getEndRowkey()).thenReturn("row150");
+        HbaseParameters parametersWithEndRowkey =
+                createParameters(HbaseParameters.DEFAULT_NAMESPACE, false, "", 
"row150");
+        HbaseSourceSplitEnumerator enumeratorWithEndRowkey =
+                new HbaseSourceSplitEnumerator(context, 
parametersWithEndRowkey, hbaseClient);
 
-        Set<HbaseSourceSplit> splits = enumerator.getTableSplits();
+        Set<HbaseSourceSplit> splits = 
enumeratorWithEndRowkey.getTableSplits();
 
         assertNotNull(splits);
         assertEquals(2, splits.size()); // Should include regions 0 and 1
@@ -291,9 +364,12 @@ public class HbaseSourceSplitEnumeratorTest {
 
         when(regionLocator.getStartKeys()).thenReturn(startKeys);
         when(regionLocator.getEndKeys()).thenReturn(endKeys);
-        when(hbaseParameters.getStartRowkey()).thenReturn("row100");
+        HbaseParameters parametersWithStartRowkey =
+                createParameters(HbaseParameters.DEFAULT_NAMESPACE, false, 
"row100", "");
+        HbaseSourceSplitEnumerator enumeratorWithStartRowkey =
+                new HbaseSourceSplitEnumerator(context, 
parametersWithStartRowkey, hbaseClient);
 
-        Set<HbaseSourceSplit> splits = enumerator.getTableSplits();
+        Set<HbaseSourceSplit> splits = 
enumeratorWithStartRowkey.getTableSplits();
 
         assertNotNull(splits);
         assertEquals(3, splits.size());
@@ -334,9 +410,12 @@ public class HbaseSourceSplitEnumeratorTest {
 
         when(regionLocator.getStartKeys()).thenReturn(startKeys);
         when(regionLocator.getEndKeys()).thenReturn(endKeys);
-        when(hbaseParameters.getEndRowkey()).thenReturn("row200");
+        HbaseParameters parametersWithEndRowkey =
+                createParameters(HbaseParameters.DEFAULT_NAMESPACE, false, "", 
"row200");
+        HbaseSourceSplitEnumerator enumeratorWithEndRowkey =
+                new HbaseSourceSplitEnumerator(context, 
parametersWithEndRowkey, hbaseClient);
 
-        Set<HbaseSourceSplit> splits = enumerator.getTableSplits();
+        Set<HbaseSourceSplit> splits = 
enumeratorWithEndRowkey.getTableSplits();
 
         assertNotNull(splits);
         assertEquals(2, splits.size());
@@ -373,10 +452,12 @@ public class HbaseSourceSplitEnumeratorTest {
 
         when(regionLocator.getStartKeys()).thenReturn(startKeys);
         when(regionLocator.getEndKeys()).thenReturn(endKeys);
-        when(hbaseParameters.getStartRowkey()).thenReturn("row100");
-        when(hbaseParameters.getEndRowkey()).thenReturn("row200");
+        HbaseParameters parametersWithRowkeyRange =
+                createParameters(HbaseParameters.DEFAULT_NAMESPACE, false, 
"row100", "row200");
+        HbaseSourceSplitEnumerator enumeratorWithRowkeyRange =
+                new HbaseSourceSplitEnumerator(context, 
parametersWithRowkeyRange, hbaseClient);
 
-        Set<HbaseSourceSplit> splits = enumerator.getTableSplits();
+        Set<HbaseSourceSplit> splits = 
enumeratorWithRowkeyRange.getTableSplits();
 
         assertNotNull(splits);
         assertEquals(1, splits.size());
@@ -459,7 +540,8 @@ public class HbaseSourceSplitEnumeratorTest {
         enumerator.registerReader(0);
         enumerator.registerReader(1);
 
-        verify(hbaseClient, times(1)).getRegionLocator("test_table");
+        verify(hbaseClient, times(1))
+                .getRegionLocator(HbaseParameters.DEFAULT_NAMESPACE, 
"test_table");
         assertEquals(0, enumerator.currentUnassignedSplitSize());
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
index 1aa6951f89..a8bf71f369 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
@@ -35,6 +35,7 @@ import 
org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.groovy.util.Maps;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
@@ -76,6 +77,10 @@ public class HbaseIT extends TestSuiteBase implements 
TestResource {
 
     private static final String ASSIGN_CF_TABLE_NAME = "assign_cf_table";
 
+    private static final String TEST_NAMESPACE = "test";
+
+    private static final String NAMESPACE_TABLE_NAME = 
"seatunnel_test_namespace";
+
     private static final String MULTI_TABLE_ONE_NAME = "hbase_sink_1";
 
     private static final String MULTI_TABLE_TWO_NAME = "hbase_sink_2";
@@ -90,6 +95,7 @@ public class HbaseIT extends TestSuiteBase implements 
TestResource {
 
     private TableName table;
     private TableName tableAssign;
+    private TableName namespaceTable;
     private TableName binaryRowkeyTable;
 
     private HbaseCluster hbaseCluster;
@@ -113,6 +119,14 @@ public class HbaseIT extends TestSuiteBase implements 
TestResource {
         hbaseCluster.createTable(BINARY_ROWKEY_TABLE_NAME, 
Arrays.asList(FAMILY_NAME));
         binaryRowkeyTable = TableName.valueOf(BINARY_ROWKEY_TABLE_NAME);
 
+        if (Arrays.stream(admin.listNamespaceDescriptors())
+                .noneMatch(descriptor -> 
TEST_NAMESPACE.equals(descriptor.getName()))) {
+            
admin.createNamespace(NamespaceDescriptor.create(TEST_NAMESPACE).build());
+        }
+        namespaceTable = TableName.valueOf(TEST_NAMESPACE, 
NAMESPACE_TABLE_NAME);
+        dropTable(namespaceTable);
+        hbaseCluster.createTable(namespaceTable.getNameAsString(), 
Arrays.asList(FAMILY_NAME));
+
         // Create table for hbase multi-table sink test
         hbaseCluster.createTable(MULTI_TABLE_ONE_NAME, 
Arrays.asList(FAMILY_NAME));
         hbaseCluster.createTable(MULTI_TABLE_TWO_NAME, 
Arrays.asList(FAMILY_NAME));
@@ -436,6 +450,16 @@ public class HbaseIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(0, sourceExecResult.getExitCode());
     }
 
+    @TestTemplate
+    public void testHbaseSourceWithNamespace(TestContainer container)
+            throws IOException, InterruptedException {
+        deleteData(namespaceTable);
+        insertData(namespaceTable);
+        Container.ExecResult sourceExecResult =
+                container.executeJob("/hbase-source-with-namespace.conf");
+        Assertions.assertEquals(0, sourceExecResult.getExitCode());
+    }
+
     @TestTemplate
     public void testHbaseSourceWithTimeRange(TestContainer container)
             throws IOException, InterruptedException {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-namespace.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-namespace.conf
new file mode 100644
index 0000000000..602f64ddf1
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-namespace.conf
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Hbase {
+    zookeeper_quorum = "hbase_e2e:2181"
+    table = "test:seatunnel_test_namespace"
+    query_columns = ["rowkey", "info:name"]
+    caching = 1000
+    batch = 100
+    cache_blocks = false
+    schema = {
+      columns = [
+        {
+          name = rowkey
+          type = string
+        },
+        {
+          name = "info:name"
+          type = string
+        }
+      ]
+    }
+  }
+}
+
+sink {
+  Assert {
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        },
+        {
+          rule_type = MIN_ROW
+          rule_value = 5
+        }
+      ],
+      field_rules = [
+        {
+          field_name = "info:name"
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
+


Reply via email to