This is an automated email from the ASF dual-hosted git repository.
zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d722474bc3 [Fix] [connector-hbase] Fix namespace handling for HBase
source (#10295)
d722474bc3 is described below
commit d722474bc36ce52735fff1b81af5527ef0bd44ca
Author: yzeng1618 <[email protected]>
AuthorDate: Wed Jan 28 10:08:56 2026 +0800
[Fix] [connector-hbase] Fix namespace handling for HBase source (#10295)
Co-authored-by: zengyi <[email protected]>
---
docs/en/connectors/sink/Hbase.md | 1 +
docs/en/connectors/source/Hbase.md | 1 +
docs/zh/connectors/sink/Hbase.md | 1 +
docs/zh/connectors/source/Hbase.md | 1 +
.../seatunnel/hbase/client/HbaseClient.java | 13 +-
.../seatunnel/hbase/config/HbaseParameters.java | 14 +-
.../hbase/source/HbaseSourceSplitEnumerator.java | 150 +++++++++++++--------
.../hbase/config/HbaseParametersTest.java | 95 +++++++++++++
.../source/HbaseSourceSplitEnumeratorTest.java | 146 +++++++++++++++-----
.../seatunnel/e2e/connector/hbase/HbaseIT.java | 24 ++++
.../resources/hbase-source-with-namespace.conf | 73 ++++++++++
11 files changed, 430 insertions(+), 89 deletions(-)
diff --git a/docs/en/connectors/sink/Hbase.md b/docs/en/connectors/sink/Hbase.md
index 9d4d40e60e..104fde4955 100644
--- a/docs/en/connectors/sink/Hbase.md
+++ b/docs/en/connectors/sink/Hbase.md
@@ -37,6 +37,7 @@ The zookeeper cluster host of hbase, example:
"hadoop001:2181,hadoop002:2181,had
### table [string]
The table name you want to write, example: "seatunnel"
+If your table is under a custom namespace, use `namespace:table` (for example,
`ns1:seatunnel_test`); if omitted, SeaTunnel will write to HBase's default
namespace (`default`).
### rowkey_column [list]
diff --git a/docs/en/connectors/source/Hbase.md
b/docs/en/connectors/source/Hbase.md
index dae2873292..8c837b670e 100644
--- a/docs/en/connectors/source/Hbase.md
+++ b/docs/en/connectors/source/Hbase.md
@@ -44,6 +44,7 @@ The zookeeper quorum for Hbase cluster hosts, e.g.,
"hadoop001:2181,hadoop002:21
### table [string]
The name of the table to write to, e.g., "seatunnel".
+If your table lives in a custom namespace, use the `namespace:table` form (for
example, `ns1:seatunnel_test`); when the namespace is omitted SeaTunnel will
read from HBase's default namespace (`default`).
### schema [config]
diff --git a/docs/zh/connectors/sink/Hbase.md b/docs/zh/connectors/sink/Hbase.md
index fc4c75e9c0..f42916c5e0 100644
--- a/docs/zh/connectors/sink/Hbase.md
+++ b/docs/zh/connectors/sink/Hbase.md
@@ -37,6 +37,7 @@ hbase的zookeeper集群主机, 示例:
"hadoop001:2181,hadoop002:2181,hadoop003:
### table [string]
要写入的表名, 例如: "seatunnel"
+如果表在自定义 namespace 下,请使用 `namespace:table` 形式(如 `ns1:seatunnel_test`);未填写
namespace 时,SeaTunnel 会写入到 HBase 默认命名空间 `default`。
### rowkey_column [list]
diff --git a/docs/zh/connectors/source/Hbase.md
b/docs/zh/connectors/source/Hbase.md
index aabfc37380..b115e42dc8 100644
--- a/docs/zh/connectors/source/Hbase.md
+++ b/docs/zh/connectors/source/Hbase.md
@@ -44,6 +44,7 @@ hbase的zookeeper集群主机,例如:“hadoop001:2181,hadoop002:2181,hadoop
### table [string]
要写入的表名,例如:“seatunnel”
+如果表在自定义 namespace 下,请使用 `namespace:table` 形式(如 `ns1:seatunnel_test`);未填写
namespace 时,SeaTunnel 会使用 HBase 的默认命名空间 `default`。
### schema [config]
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java
index 70c393f799..64a5cbb102 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java
@@ -368,7 +368,9 @@ public class HbaseClient {
throws IOException {
Scan scan = buildScan(split, hbaseParameters, columnNames);
return this.connection
- .getTable(TableName.valueOf(hbaseParameters.getTable()))
+ .getTable(
+ TableName.valueOf(
+ hbaseParameters.getNamespace(),
hbaseParameters.getTable()))
.getScanner(scan);
}
@@ -416,11 +418,18 @@ public class HbaseClient {
/**
* Get a RegionLocator.
*
- * @param tableName table name
+ * @param tableName table name (preferably fully qualified as {@code
namespace:table})
* @return RegionLocator
* @throws IOException exception
+ * @deprecated Use {@link #getRegionLocator(String, String)} instead to
avoid relying on the
+ * default namespace behavior.
*/
+ @Deprecated
public RegionLocator getRegionLocator(String tableName) throws IOException
{
return this.connection.getRegionLocator(TableName.valueOf(tableName));
}
+
+ public RegionLocator getRegionLocator(String namespace, String tableName)
throws IOException {
+ return this.connection.getRegionLocator(TableName.valueOf(namespace,
tableName));
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
index a6e6a04e77..7068c4a6e5 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
@@ -19,6 +19,8 @@ package
org.apache.seatunnel.connectors.seatunnel.hbase.config;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+
import lombok.Builder;
import lombok.Getter;
@@ -30,6 +32,8 @@ import java.util.Map;
@Getter
public class HbaseParameters implements Serializable {
+ public static final String DEFAULT_NAMESPACE =
NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR;
+
private String zookeeperQuorum;
private String namespace;
@@ -95,7 +99,7 @@ public class HbaseParameters implements Serializable {
builder.table(table.substring(colonIndex + 1));
} else {
builder.table(table);
- builder.namespace("default");
+ builder.namespace(DEFAULT_NAMESPACE);
}
// required parameters
@@ -129,6 +133,7 @@ public class HbaseParameters implements Serializable {
builder.table(table.substring(colonIndex + 1));
} else {
builder.table(table);
+ builder.namespace(DEFAULT_NAMESPACE);
}
if
(pluginConfig.getOptional(HbaseSinkOptions.HBASE_EXTRA_CONFIG).isPresent()) {
@@ -168,4 +173,11 @@ public class HbaseParameters implements Serializable {
}
return builder.build();
}
+
+ public String getNamespace() {
+ if (namespace == null || namespace.trim().isEmpty()) {
+ return DEFAULT_NAMESPACE;
+ }
+ return namespace;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumerator.java
index 54306ef6ec..744130240a 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumerator.java
@@ -23,9 +23,12 @@ import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTestin
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.hbase.client.HbaseClient;
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
import org.apache.seatunnel.connectors.seatunnel.hbase.util.HBaseUtil;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.util.Bytes;
@@ -59,14 +62,14 @@ public class HbaseSourceSplitEnumerator
public HbaseSourceSplitEnumerator(
Context<HbaseSourceSplit> context, HbaseParameters
hbaseParameters) {
- this(context, hbaseParameters, new HashSet<>());
+ this(context, hbaseParameters, new HashSet<>(), null);
}
public HbaseSourceSplitEnumerator(
Context<HbaseSourceSplit> context,
HbaseParameters hbaseParameters,
HbaseSourceState sourceState) {
- this(context, hbaseParameters, sourceState.getAssignedSplits());
+ this(context, hbaseParameters, sourceState.getAssignedSplits(), null);
}
@VisibleForTesting
@@ -90,7 +93,7 @@ public class HbaseSourceSplitEnumerator
Context<HbaseSourceSplit> context,
HbaseParameters hbaseParameters,
Set<HbaseSourceSplit> assignedSplit) {
- this(context, hbaseParameters, assignedSplit,
HbaseClient.createInstance(hbaseParameters));
+ this(context, hbaseParameters, assignedSplit, null);
}
private HbaseSourceSplitEnumerator(
@@ -216,64 +219,103 @@ public class HbaseSourceSplitEnumerator
@VisibleForTesting
public Set<HbaseSourceSplit> getTableSplits() {
-
+ String namespace = hbaseParameters.getNamespace();
+ TableName tableName = TableName.valueOf(namespace,
hbaseParameters.getTable());
try {
- RegionLocator regionLocator =
hbaseClient.getRegionLocator(hbaseParameters.getTable());
- byte[][] startKeys = regionLocator.getStartKeys();
- byte[][] endKeys = regionLocator.getEndKeys();
- List<HbaseSourceSplit> splits = new ArrayList<>();
- boolean isBinaryRowkey = hbaseParameters.isBinaryRowkey();
- byte[] userStartRowkey =
- HBaseUtil.convertRowKey(hbaseParameters.getStartRowkey(),
isBinaryRowkey);
- byte[] userEndRowkey =
- HBaseUtil.convertRowKey(hbaseParameters.getEndRowkey(),
isBinaryRowkey);
- HBaseUtil.validateRowKeyRange(userStartRowkey, userEndRowkey);
-
- int i = 0;
- while (i < startKeys.length) {
- byte[] regionStartKey = startKeys[i];
- byte[] regionEndKey = endKeys[i];
- if (userEndRowkey.length > 0
- && Bytes.compareTo(userEndRowkey, regionStartKey) <= 0
- && Bytes.compareTo(regionStartKey,
HConstants.EMPTY_BYTE_ARRAY) != 0) {
- i++;
- continue;
- }
+ HbaseClient hbaseClient = getHbaseClient();
+ log.info("Enumerating HBase source splits for table [{}]",
tableName.getNameAsString());
+ if (!hbaseClient.tableExists(tableName.getNameAsString())) {
+ String errorMsg =
+ String.format(
+ "HBase table [%s] does not exist",
tableName.getNameAsString());
+ log.error(errorMsg);
+ throw new HbaseConnectorException(
+ HbaseConnectorErrorCode.TABLE_QUERY_EXCEPTION,
errorMsg);
+ }
- if (userStartRowkey.length > 0
- && Bytes.compareTo(userStartRowkey, regionEndKey) >= 0
- && Bytes.compareTo(regionEndKey,
HConstants.EMPTY_BYTE_ARRAY) != 0) {
+ try (RegionLocator regionLocator =
+ hbaseClient.getRegionLocator(namespace,
hbaseParameters.getTable())) {
+ byte[][] startKeys = regionLocator.getStartKeys();
+ byte[][] endKeys = regionLocator.getEndKeys();
+ if (startKeys.length == 0 || endKeys.length == 0) {
+ String errorMsg =
+ String.format(
+ "No region information found for HBase
table [%s], please check whether the table exists "
+ + "and current user has permission
to access it",
+ tableName.getNameAsString());
+ log.error(errorMsg);
+ throw new HbaseConnectorException(
+ HbaseConnectorErrorCode.TABLE_QUERY_EXCEPTION,
errorMsg);
+ }
+ List<HbaseSourceSplit> splits = new ArrayList<>();
+ boolean isBinaryRowkey = hbaseParameters.isBinaryRowkey();
+ byte[] userStartRowkey =
+
HBaseUtil.convertRowKey(hbaseParameters.getStartRowkey(), isBinaryRowkey);
+ byte[] userEndRowkey =
+
HBaseUtil.convertRowKey(hbaseParameters.getEndRowkey(), isBinaryRowkey);
+ HBaseUtil.validateRowKeyRange(userStartRowkey, userEndRowkey);
+
+ int i = 0;
+ while (i < startKeys.length) {
+ byte[] regionStartKey = startKeys[i];
+ byte[] regionEndKey = endKeys[i];
+ if (userEndRowkey.length > 0
+ && Bytes.compareTo(userEndRowkey, regionStartKey)
<= 0
+ && Bytes.compareTo(regionStartKey,
HConstants.EMPTY_BYTE_ARRAY) != 0) {
+ i++;
+ continue;
+ }
+
+ if (userStartRowkey.length > 0
+ && Bytes.compareTo(userStartRowkey, regionEndKey)
>= 0
+ && Bytes.compareTo(regionEndKey,
HConstants.EMPTY_BYTE_ARRAY) != 0) {
+ i++;
+ continue;
+ }
+ byte[] splitStartKey =
+ userStartRowkey.length > 0
+ && (Bytes.compareTo(
+
regionStartKey,
+
HConstants.EMPTY_BYTE_ARRAY)
+ == 0
+ || Bytes.compareTo(
+
userStartRowkey, regionStartKey)
+ > 0)
+ ? userStartRowkey
+ : regionStartKey;
+
+ byte[] splitEndKey =
+ userEndRowkey.length > 0
+ && (Bytes.compareTo(
+
regionEndKey,
+
HConstants.EMPTY_BYTE_ARRAY)
+ == 0
+ ||
Bytes.compareTo(userEndRowkey, regionEndKey)
+ < 0)
+ ? userEndRowkey
+ : regionEndKey;
+
+ splits.add(new HbaseSourceSplit(i, splitStartKey,
splitEndKey));
i++;
- continue;
}
- byte[] splitStartKey =
- userStartRowkey.length > 0
- && (Bytes.compareTo(
- regionStartKey,
-
HConstants.EMPTY_BYTE_ARRAY)
- == 0
- ||
Bytes.compareTo(userStartRowkey, regionStartKey)
- > 0)
- ? userStartRowkey
- : regionStartKey;
-
- byte[] splitEndKey =
- userEndRowkey.length > 0
- && (Bytes.compareTo(
- regionEndKey,
-
HConstants.EMPTY_BYTE_ARRAY)
- == 0
- ||
Bytes.compareTo(userEndRowkey, regionEndKey) < 0)
- ? userEndRowkey
- : regionEndKey;
-
- splits.add(new HbaseSourceSplit(i, splitStartKey,
splitEndKey));
- i++;
+ return new HashSet<>(splits);
}
- return new HashSet<>(splits);
} catch (IOException e) {
- throw new RuntimeException(e);
+ String errorMsg =
+ String.format(
+ "Failed to enumerate splits for HBase table [%s]",
+ tableName.getNameAsString());
+ log.error(errorMsg, e);
+ throw new HbaseConnectorException(
+ HbaseConnectorErrorCode.TABLE_QUERY_EXCEPTION, errorMsg,
e);
+ }
+ }
+
+ private synchronized HbaseClient getHbaseClient() {
+ if (hbaseClient == null) {
+ hbaseClient = HbaseClient.createInstance(hbaseParameters);
}
+ return hbaseClient;
}
/** Hash algorithm for assigning splits to readers */
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParametersTest.java
b/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParametersTest.java
index 34cf23aa34..c0ce4912fe 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParametersTest.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParametersTest.java
@@ -28,6 +28,30 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class HbaseParametersTest {
+ @Test
+ void testBuildWithSourceConfigWithoutNamespace() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(HbaseBaseOptions.ZOOKEEPER_QUORUM.key(),
"127.0.0.1:2181");
+ configMap.put(HbaseBaseOptions.TABLE.key(), "tbl");
+ ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(configMap);
+
+ HbaseParameters parameters =
HbaseParameters.buildWithSourceConfig(readonlyConfig);
+ assertEquals(HbaseParameters.DEFAULT_NAMESPACE,
parameters.getNamespace());
+ assertEquals("tbl", parameters.getTable());
+ }
+
+ @Test
+ void testBuildWithSourceConfigWithNamespace() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(HbaseBaseOptions.ZOOKEEPER_QUORUM.key(),
"127.0.0.1:2181");
+ configMap.put(HbaseBaseOptions.TABLE.key(), "test:tbl");
+ ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(configMap);
+
+ HbaseParameters parameters =
HbaseParameters.buildWithSourceConfig(readonlyConfig);
+ assertEquals("test", parameters.getNamespace());
+ assertEquals("tbl", parameters.getTable());
+ }
+
@Test
void testBuildWithSourceConfigReadsTimeRange() {
Map<String, Object> config = new HashMap<>();
@@ -42,4 +66,75 @@ public class HbaseParametersTest {
assertEquals(1000L, parameters.getStartTimestamp());
assertEquals(2000L, parameters.getEndTimestamp());
}
+
+ @Test
+ void testGetNamespaceReturnsDefaultWhenNull() {
+ HbaseParameters parameters =
+ HbaseParameters.builder()
+ .namespace(null)
+ .table("tbl")
+ .zookeeperQuorum("127.0.0.1:2181")
+ .build();
+ assertEquals(HbaseParameters.DEFAULT_NAMESPACE,
parameters.getNamespace());
+ }
+
+ @Test
+ void testBuildWithSourceConfigWithLeadingColonUsesDefaultNamespace() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(HbaseBaseOptions.ZOOKEEPER_QUORUM.key(),
"127.0.0.1:2181");
+ configMap.put(HbaseBaseOptions.TABLE.key(), ":tbl");
+
+ HbaseParameters parameters =
+
HbaseParameters.buildWithSourceConfig(ReadonlyConfig.fromMap(configMap));
+ assertEquals(HbaseParameters.DEFAULT_NAMESPACE,
parameters.getNamespace());
+ assertEquals("tbl", parameters.getTable());
+ }
+
+ @Test
+ void testBuildWithSourceConfigWithMultipleColons() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(HbaseBaseOptions.ZOOKEEPER_QUORUM.key(),
"127.0.0.1:2181");
+ configMap.put(HbaseBaseOptions.TABLE.key(), "ns:tbl:extra");
+
+ HbaseParameters parameters =
+
HbaseParameters.buildWithSourceConfig(ReadonlyConfig.fromMap(configMap));
+ assertEquals("ns", parameters.getNamespace());
+ assertEquals("tbl:extra", parameters.getTable());
+ }
+
+ @Test
+ void testBuildWithSourceConfigWithSpaces() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(HbaseBaseOptions.ZOOKEEPER_QUORUM.key(),
"127.0.0.1:2181");
+ configMap.put(HbaseBaseOptions.TABLE.key(), " ns : tbl ");
+
+ HbaseParameters parameters =
+
HbaseParameters.buildWithSourceConfig(ReadonlyConfig.fromMap(configMap));
+ assertEquals(" ns ", parameters.getNamespace());
+ assertEquals(" tbl ", parameters.getTable());
+ }
+
+ @Test
+ void testBuildWithSourceConfigWithEmptyTableName() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(HbaseBaseOptions.ZOOKEEPER_QUORUM.key(),
"127.0.0.1:2181");
+ configMap.put(HbaseBaseOptions.TABLE.key(), "test:");
+
+ HbaseParameters parameters =
+
HbaseParameters.buildWithSourceConfig(ReadonlyConfig.fromMap(configMap));
+ assertEquals("test", parameters.getNamespace());
+ assertEquals("", parameters.getTable());
+ }
+
+ @Test
+ void testBuildWithSourceConfigWithoutNamespaceKeepsSpacesInTableName() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(HbaseBaseOptions.ZOOKEEPER_QUORUM.key(),
"127.0.0.1:2181");
+ configMap.put(HbaseBaseOptions.TABLE.key(), " tbl ");
+
+ HbaseParameters parameters =
+
HbaseParameters.buildWithSourceConfig(ReadonlyConfig.fromMap(configMap));
+ assertEquals(HbaseParameters.DEFAULT_NAMESPACE,
parameters.getNamespace());
+ assertEquals(" tbl ", parameters.getTable());
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumeratorTest.java
b/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumeratorTest.java
index 0fffeec0cc..66a6df90dd 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumeratorTest.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumeratorTest.java
@@ -20,6 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.hbase.source;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.hbase.client.HbaseClient;
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+import
org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionLocator;
@@ -43,7 +44,9 @@ import static
org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -57,7 +60,7 @@ public class HbaseSourceSplitEnumeratorTest {
@Mock private RegionLocator regionLocator;
- @Mock private HbaseParameters hbaseParameters;
+ private HbaseParameters hbaseParameters;
private HbaseSourceSplitEnumerator enumerator;
@@ -65,13 +68,23 @@ public class HbaseSourceSplitEnumeratorTest {
void setUp() throws IOException {
MockitoAnnotations.openMocks(this);
- when(hbaseParameters.getTable()).thenReturn("test_table");
-
when(hbaseParameters.getZookeeperQuorum()).thenReturn("127.0.0.1:2801");
- when(hbaseParameters.isBinaryRowkey()).thenReturn(false);
- when(hbaseParameters.getStartRowkey()).thenReturn("");
- when(hbaseParameters.getEndRowkey()).thenReturn("");
+ hbaseParameters = createParameters(HbaseParameters.DEFAULT_NAMESPACE,
false, "", "");
enumerator = new HbaseSourceSplitEnumerator(context, hbaseParameters,
hbaseClient);
-
when(hbaseClient.getRegionLocator("test_table")).thenReturn(regionLocator);
+ when(hbaseClient.tableExists(anyString())).thenReturn(true);
+ when(hbaseClient.getRegionLocator(HbaseParameters.DEFAULT_NAMESPACE,
"test_table"))
+ .thenReturn(regionLocator);
+ }
+
+ private HbaseParameters createParameters(
+ String namespace, boolean isBinaryRowkey, String startRowkey,
String endRowkey) {
+ return HbaseParameters.builder()
+ .namespace(namespace)
+ .table("test_table")
+ .zookeeperQuorum("127.0.0.1:2801")
+ .isBinaryRowkey(isBinaryRowkey)
+ .startRowkey(startRowkey)
+ .endRowkey(endRowkey)
+ .build();
}
@Test
@@ -93,6 +106,49 @@ public class HbaseSourceSplitEnumeratorTest {
assertArrayEquals(HConstants.EMPTY_BYTE_ARRAY, split.getEndRow());
}
+ @Test
+ void testGetTableSplitsWithBlankNamespaceUsesDefault() throws IOException {
+ HbaseParameters blankNamespaceParameters = createParameters("", false,
"", "");
+ byte[][] startKeys = {HConstants.EMPTY_BYTE_ARRAY};
+ byte[][] endKeys = {HConstants.EMPTY_BYTE_ARRAY};
+ when(regionLocator.getStartKeys()).thenReturn(startKeys);
+ when(regionLocator.getEndKeys()).thenReturn(endKeys);
+
+ HbaseSourceSplitEnumerator enumeratorWithBlankNamespace =
+ new HbaseSourceSplitEnumerator(context,
blankNamespaceParameters, hbaseClient);
+ Set<HbaseSourceSplit> splits =
enumeratorWithBlankNamespace.getTableSplits();
+
+ assertNotNull(splits);
+ assertEquals(1, splits.size());
+ verify(hbaseClient, times(1))
+ .getRegionLocator(HbaseParameters.DEFAULT_NAMESPACE,
"test_table");
+ }
+
+ @Test
+ void testGetTableSplitsWithTableNotExists() {
+ when(hbaseClient.tableExists(anyString())).thenReturn(false);
+
+ assertThrows(HbaseConnectorException.class, () ->
enumerator.getTableSplits());
+ }
+
+ @Test
+ void testGetTableSplitsWithNoRegionInfo() throws IOException {
+ when(regionLocator.getStartKeys()).thenReturn(new byte[0][]);
+ when(regionLocator.getEndKeys()).thenReturn(new byte[0][]);
+
+ assertThrows(HbaseConnectorException.class, () ->
enumerator.getTableSplits());
+ }
+
+ @Test
+ void testGetTableSplitsWrapsIOExceptionAsHbaseConnectorException() throws
IOException {
+ when(hbaseClient.getRegionLocator(HbaseParameters.DEFAULT_NAMESPACE,
"test_table"))
+ .thenThrow(new IOException("region locator error"));
+
+ HbaseConnectorException exception =
+ assertThrows(HbaseConnectorException.class, () ->
enumerator.getTableSplits());
+ assertTrue(exception.getCause() instanceof IOException);
+ }
+
@Test
void testGetTableSplitsWithUserDefinedRowKeyRange() throws IOException {
// Simulate a table with 4 regions but user only wants data from
"row100" to "row300"
@@ -111,10 +167,12 @@ public class HbaseSourceSplitEnumeratorTest {
when(regionLocator.getStartKeys()).thenReturn(startKeys);
when(regionLocator.getEndKeys()).thenReturn(endKeys);
- when(hbaseParameters.getStartRowkey()).thenReturn("row100");
- when(hbaseParameters.getEndRowkey()).thenReturn("row300");
+ HbaseParameters parametersWithRowkeyRange =
+ createParameters(HbaseParameters.DEFAULT_NAMESPACE, false,
"row100", "row300");
+ HbaseSourceSplitEnumerator enumeratorWithRowkeyRange =
+ new HbaseSourceSplitEnumerator(context,
parametersWithRowkeyRange, hbaseClient);
- Set<HbaseSourceSplit> splits = enumerator.getTableSplits();
+ Set<HbaseSourceSplit> splits =
enumeratorWithRowkeyRange.getTableSplits();
assertNotNull(splits);
assertEquals(2, splits.size()); // Should only include regions 1 and 2
@@ -145,11 +203,16 @@ public class HbaseSourceSplitEnumeratorTest {
when(regionLocator.getStartKeys()).thenReturn(startKeys);
when(regionLocator.getEndKeys()).thenReturn(endKeys);
- when(hbaseParameters.isBinaryRowkey()).thenReturn(true);
- when(hbaseParameters.getStartRowkey()).thenReturn("\\x01\\x01\\x01");
- when(hbaseParameters.getEndRowkey()).thenReturn("\\x02\\x02\\x02");
+ HbaseParameters binaryRowkeyParameters =
+ createParameters(
+ HbaseParameters.DEFAULT_NAMESPACE,
+ true,
+ "\\x01\\x01\\x01",
+ "\\x02\\x02\\x02");
+ HbaseSourceSplitEnumerator enumeratorWithBinaryRowkey =
+ new HbaseSourceSplitEnumerator(context,
binaryRowkeyParameters, hbaseClient);
- Set<HbaseSourceSplit> splits = enumerator.getTableSplits();
+ Set<HbaseSourceSplit> splits =
enumeratorWithBinaryRowkey.getTableSplits();
assertNotNull(splits);
assertEquals(2, splits.size());
@@ -166,10 +229,12 @@ public class HbaseSourceSplitEnumeratorTest {
when(regionLocator.getStartKeys()).thenReturn(startKeys);
when(regionLocator.getEndKeys()).thenReturn(endKeys);
- when(hbaseParameters.getStartRowkey()).thenReturn("row10");
- when(hbaseParameters.getEndRowkey()).thenReturn("row15");
+ HbaseParameters parametersWithRowkeyRange =
+ createParameters(HbaseParameters.DEFAULT_NAMESPACE, false,
"row10", "row15");
+ HbaseSourceSplitEnumerator enumeratorWithRowkeyRange =
+ new HbaseSourceSplitEnumerator(context,
parametersWithRowkeyRange, hbaseClient);
- Set<HbaseSourceSplit> splits = enumerator.getTableSplits();
+ Set<HbaseSourceSplit> splits =
enumeratorWithRowkeyRange.getTableSplits();
assertNotNull(splits);
assertEquals(1, splits.size()); // Should include the first region
@@ -191,10 +256,12 @@ public class HbaseSourceSplitEnumeratorTest {
when(regionLocator.getStartKeys()).thenReturn(startKeys);
when(regionLocator.getEndKeys()).thenReturn(endKeys);
- when(hbaseParameters.getStartRowkey()).thenReturn("row500");
- when(hbaseParameters.getEndRowkey()).thenReturn("row600");
+ HbaseParameters parametersWithRowkeyRange =
+ createParameters(HbaseParameters.DEFAULT_NAMESPACE, false,
"row500", "row600");
+ HbaseSourceSplitEnumerator enumeratorWithRowkeyRange =
+ new HbaseSourceSplitEnumerator(context,
parametersWithRowkeyRange, hbaseClient);
- Set<HbaseSourceSplit> splits = enumerator.getTableSplits();
+ Set<HbaseSourceSplit> splits =
enumeratorWithRowkeyRange.getTableSplits();
assertNotNull(splits);
assertEquals(1, splits.size()); // Should include the last region
@@ -216,9 +283,12 @@ public class HbaseSourceSplitEnumeratorTest {
when(regionLocator.getStartKeys()).thenReturn(startKeys);
when(regionLocator.getEndKeys()).thenReturn(endKeys);
- when(hbaseParameters.getStartRowkey()).thenReturn("row150");
+ HbaseParameters parametersWithStartRowkey =
+ createParameters(HbaseParameters.DEFAULT_NAMESPACE, false,
"row150", "");
+ HbaseSourceSplitEnumerator enumeratorWithStartRowkey =
+ new HbaseSourceSplitEnumerator(context,
parametersWithStartRowkey, hbaseClient);
- Set<HbaseSourceSplit> splits = enumerator.getTableSplits();
+ Set<HbaseSourceSplit> splits =
enumeratorWithStartRowkey.getTableSplits();
assertNotNull(splits);
assertEquals(2, splits.size()); // Should include regions 1 and 2
@@ -251,9 +321,12 @@ public class HbaseSourceSplitEnumeratorTest {
when(regionLocator.getStartKeys()).thenReturn(startKeys);
when(regionLocator.getEndKeys()).thenReturn(endKeys);
- when(hbaseParameters.getEndRowkey()).thenReturn("row150");
+ HbaseParameters parametersWithEndRowkey =
+ createParameters(HbaseParameters.DEFAULT_NAMESPACE, false, "",
"row150");
+ HbaseSourceSplitEnumerator enumeratorWithEndRowkey =
+ new HbaseSourceSplitEnumerator(context,
parametersWithEndRowkey, hbaseClient);
- Set<HbaseSourceSplit> splits = enumerator.getTableSplits();
+ Set<HbaseSourceSplit> splits =
enumeratorWithEndRowkey.getTableSplits();
assertNotNull(splits);
assertEquals(2, splits.size()); // Should include regions 0 and 1
@@ -291,9 +364,12 @@ public class HbaseSourceSplitEnumeratorTest {
when(regionLocator.getStartKeys()).thenReturn(startKeys);
when(regionLocator.getEndKeys()).thenReturn(endKeys);
- when(hbaseParameters.getStartRowkey()).thenReturn("row100");
+ HbaseParameters parametersWithStartRowkey =
+ createParameters(HbaseParameters.DEFAULT_NAMESPACE, false,
"row100", "");
+ HbaseSourceSplitEnumerator enumeratorWithStartRowkey =
+ new HbaseSourceSplitEnumerator(context,
parametersWithStartRowkey, hbaseClient);
- Set<HbaseSourceSplit> splits = enumerator.getTableSplits();
+ Set<HbaseSourceSplit> splits =
enumeratorWithStartRowkey.getTableSplits();
assertNotNull(splits);
assertEquals(3, splits.size());
@@ -334,9 +410,12 @@ public class HbaseSourceSplitEnumeratorTest {
when(regionLocator.getStartKeys()).thenReturn(startKeys);
when(regionLocator.getEndKeys()).thenReturn(endKeys);
- when(hbaseParameters.getEndRowkey()).thenReturn("row200");
+ HbaseParameters parametersWithEndRowkey =
+ createParameters(HbaseParameters.DEFAULT_NAMESPACE, false, "",
"row200");
+ HbaseSourceSplitEnumerator enumeratorWithEndRowkey =
+ new HbaseSourceSplitEnumerator(context,
parametersWithEndRowkey, hbaseClient);
- Set<HbaseSourceSplit> splits = enumerator.getTableSplits();
+ Set<HbaseSourceSplit> splits =
enumeratorWithEndRowkey.getTableSplits();
assertNotNull(splits);
assertEquals(2, splits.size());
@@ -373,10 +452,12 @@ public class HbaseSourceSplitEnumeratorTest {
when(regionLocator.getStartKeys()).thenReturn(startKeys);
when(regionLocator.getEndKeys()).thenReturn(endKeys);
- when(hbaseParameters.getStartRowkey()).thenReturn("row100");
- when(hbaseParameters.getEndRowkey()).thenReturn("row200");
+ HbaseParameters parametersWithRowkeyRange =
+ createParameters(HbaseParameters.DEFAULT_NAMESPACE, false,
"row100", "row200");
+ HbaseSourceSplitEnumerator enumeratorWithRowkeyRange =
+ new HbaseSourceSplitEnumerator(context,
parametersWithRowkeyRange, hbaseClient);
- Set<HbaseSourceSplit> splits = enumerator.getTableSplits();
+ Set<HbaseSourceSplit> splits =
enumeratorWithRowkeyRange.getTableSplits();
assertNotNull(splits);
assertEquals(1, splits.size());
@@ -459,7 +540,8 @@ public class HbaseSourceSplitEnumeratorTest {
enumerator.registerReader(0);
enumerator.registerReader(1);
- verify(hbaseClient, times(1)).getRegionLocator("test_table");
+ verify(hbaseClient, times(1))
+ .getRegionLocator(HbaseParameters.DEFAULT_NAMESPACE,
"test_table");
assertEquals(0, enumerator.currentUnassignedSplitSize());
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
index 1aa6951f89..a8bf71f369 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
@@ -35,6 +35,7 @@ import
org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.groovy.util.Maps;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
@@ -76,6 +77,10 @@ public class HbaseIT extends TestSuiteBase implements
TestResource {
private static final String ASSIGN_CF_TABLE_NAME = "assign_cf_table";
+ private static final String TEST_NAMESPACE = "test";
+
+ private static final String NAMESPACE_TABLE_NAME =
"seatunnel_test_namespace";
+
private static final String MULTI_TABLE_ONE_NAME = "hbase_sink_1";
private static final String MULTI_TABLE_TWO_NAME = "hbase_sink_2";
@@ -90,6 +95,7 @@ public class HbaseIT extends TestSuiteBase implements
TestResource {
private TableName table;
private TableName tableAssign;
+ private TableName namespaceTable;
private TableName binaryRowkeyTable;
private HbaseCluster hbaseCluster;
@@ -113,6 +119,14 @@ public class HbaseIT extends TestSuiteBase implements
TestResource {
hbaseCluster.createTable(BINARY_ROWKEY_TABLE_NAME,
Arrays.asList(FAMILY_NAME));
binaryRowkeyTable = TableName.valueOf(BINARY_ROWKEY_TABLE_NAME);
+ if (Arrays.stream(admin.listNamespaceDescriptors())
+ .noneMatch(descriptor ->
TEST_NAMESPACE.equals(descriptor.getName()))) {
+
admin.createNamespace(NamespaceDescriptor.create(TEST_NAMESPACE).build());
+ }
+ namespaceTable = TableName.valueOf(TEST_NAMESPACE,
NAMESPACE_TABLE_NAME);
+ dropTable(namespaceTable);
+ hbaseCluster.createTable(namespaceTable.getNameAsString(),
Arrays.asList(FAMILY_NAME));
+
// Create table for hbase multi-table sink test
hbaseCluster.createTable(MULTI_TABLE_ONE_NAME,
Arrays.asList(FAMILY_NAME));
hbaseCluster.createTable(MULTI_TABLE_TWO_NAME,
Arrays.asList(FAMILY_NAME));
@@ -436,6 +450,16 @@ public class HbaseIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(0, sourceExecResult.getExitCode());
}
+ @TestTemplate
+ public void testHbaseSourceWithNamespace(TestContainer container)
+ throws IOException, InterruptedException {
+ deleteData(namespaceTable);
+ insertData(namespaceTable);
+ Container.ExecResult sourceExecResult =
+ container.executeJob("/hbase-source-with-namespace.conf");
+ Assertions.assertEquals(0, sourceExecResult.getExitCode());
+ }
+
@TestTemplate
public void testHbaseSourceWithTimeRange(TestContainer container)
throws IOException, InterruptedException {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-namespace.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-namespace.conf
new file mode 100644
index 0000000000..602f64ddf1
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-with-namespace.conf
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Hbase {
+ zookeeper_quorum = "hbase_e2e:2181"
+ table = "test:seatunnel_test_namespace"
+ query_columns = ["rowkey", "info:name"]
+ caching = 1000
+ batch = 100
+ cache_blocks = false
+ schema = {
+ columns = [
+ {
+ name = rowkey
+ type = string
+ },
+ {
+ name = "info:name"
+ type = string
+ }
+ ]
+ }
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = "info:name"
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
+