This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 9d58bc01ac [Fix][Connector-V2][Hbase] Fix ERROR_WHEN_DATA_EXISTS NPE
on empty table (#10336)
9d58bc01ac is described below
commit 9d58bc01ac2f97a6febf4766f5b8a2cfd1ed803d
Author: yzeng1618 <[email protected]>
AuthorDate: Thu Jan 15 21:04:16 2026 +0800
[Fix][Connector-V2][Hbase] Fix ERROR_WHEN_DATA_EXISTS NPE on empty table
(#10336)
---
.../seatunnel/hbase/client/HbaseClient.java | 16 ++--
.../seatunnel/hbase/client/HbaseClientTest.java | 89 ++++++++++++++++++++++
.../seatunnel/e2e/connector/hbase/HbaseIT.java | 11 +++
3 files changed, 107 insertions(+), 9 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java
index 3b169ae0e4..b034736028 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java
@@ -310,15 +310,13 @@ public class HbaseClient {
* @return true if the table has data, false otherwise
*/
public boolean isExistsData(String databaseName, String tableName) {
- try {
- Table table = connection.getTable(TableName.valueOf(databaseName,
tableName));
- Scan scan = new Scan();
- scan.setCaching(1);
- scan.setLimit(1);
- try (ResultScanner scanner = table.getScanner(scan)) {
- Result result = scanner.next();
- return !result.isEmpty();
- }
+ Scan scan = new Scan();
+ scan.setCaching(1);
+ scan.setLimit(1);
+ try (Table table = connection.getTable(TableName.valueOf(databaseName,
tableName));
+ ResultScanner scanner = table.getScanner(scan)) {
+ Result result = scanner.next();
+ return result != null && !result.isEmpty();
} catch (IOException e) {
throw new HbaseConnectorException(
HbaseConnectorErrorCode.TABLE_QUERY_EXCEPTION,
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClientTest.java
b/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClientTest.java
new file mode 100644
index 0000000000..70f0fa3251
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClientTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.client;
+
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.BufferedMutatorParams;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Constructor;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+
+class HbaseClientTest {
+
+ @Test
+ void testIsExistsDataReturnsFalseWhenScannerNextReturnsNull() throws
Exception {
+ Connection connection = Mockito.mock(Connection.class);
+ Table table = Mockito.mock(Table.class);
+ ResultScanner scanner = Mockito.mock(ResultScanner.class);
+
Mockito.when(connection.getTable(any(TableName.class))).thenReturn(table);
+ Mockito.when(table.getScanner(any(Scan.class))).thenReturn(scanner);
+ Mockito.when(scanner.next()).thenReturn(null);
+
+ HbaseClient client = newHbaseClient(connection);
+
+ assertFalse(client.isExistsData("ns", "tbl"));
+ }
+
+ @Test
+ void testIsExistsDataReturnsTrueWhenScannerHasResult() throws Exception {
+ Connection connection = Mockito.mock(Connection.class);
+ Table table = Mockito.mock(Table.class);
+ ResultScanner scanner = Mockito.mock(ResultScanner.class);
+ Result result = Mockito.mock(Result.class);
+ Mockito.when(result.isEmpty()).thenReturn(false);
+
Mockito.when(connection.getTable(any(TableName.class))).thenReturn(table);
+ Mockito.when(table.getScanner(any(Scan.class))).thenReturn(scanner);
+ Mockito.when(scanner.next()).thenReturn(result);
+
+ HbaseClient client = newHbaseClient(connection);
+
+ assertTrue(client.isExistsData("ns", "tbl"));
+ }
+
+ private HbaseClient newHbaseClient(Connection connection) throws Exception
{
+ HbaseClient.hbaseConfiguration = HBaseConfiguration.create();
+
Mockito.when(connection.getAdmin()).thenReturn(Mockito.mock(Admin.class));
+
Mockito.when(connection.getBufferedMutator(any(BufferedMutatorParams.class)))
+ .thenReturn(Mockito.mock(BufferedMutator.class));
+ HbaseParameters hbaseParameters = Mockito.mock(HbaseParameters.class);
+ Mockito.when(hbaseParameters.getNamespace()).thenReturn("ns");
+ Mockito.when(hbaseParameters.getTable()).thenReturn("tbl");
+ Mockito.when(hbaseParameters.getWriteBufferSize()).thenReturn(1);
+
+ Constructor<HbaseClient> constructor =
+ HbaseClient.class.getDeclaredConstructor(Connection.class,
HbaseParameters.class);
+ constructor.setAccessible(true);
+ return constructor.newInstance(connection, hbaseParameters);
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
index b511cde2a1..ff3e7476d6 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
@@ -166,6 +166,17 @@ public class HbaseIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(1, execResult.getExitCode());
}
+ @TestTemplate
+ public void testHbaseSinkWithErrorWhenDataExistsOnEmptyTable(TestContainer
container)
+ throws IOException, InterruptedException {
+ deleteData(table);
+ Assertions.assertEquals(0, countData(table));
+ Container.ExecResult execResult =
+
container.executeJob("/fake_to_hbase_with_error_when_data_exists.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertEquals(5, countData(table));
+ }
+
@TestTemplate
public void testHbaseSinkWithRecreateSchema(TestContainer container)
throws IOException, InterruptedException {