This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9d58bc01ac [Fix][Connector-V2][Hbase] Fix ERROR_WHEN_DATA_EXISTS NPE 
on empty table (#10336)
9d58bc01ac is described below

commit 9d58bc01ac2f97a6febf4766f5b8a2cfd1ed803d
Author: yzeng1618 <[email protected]>
AuthorDate: Thu Jan 15 21:04:16 2026 +0800

    [Fix][Connector-V2][Hbase] Fix ERROR_WHEN_DATA_EXISTS NPE on empty table 
(#10336)
---
 .../seatunnel/hbase/client/HbaseClient.java        | 16 ++--
 .../seatunnel/hbase/client/HbaseClientTest.java    | 89 ++++++++++++++++++++++
 .../seatunnel/e2e/connector/hbase/HbaseIT.java     | 11 +++
 3 files changed, 107 insertions(+), 9 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java
index 3b169ae0e4..b034736028 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java
@@ -310,15 +310,13 @@ public class HbaseClient {
      * @return true if the table has data, false otherwise
      */
     public boolean isExistsData(String databaseName, String tableName) {
-        try {
-            Table table = connection.getTable(TableName.valueOf(databaseName, 
tableName));
-            Scan scan = new Scan();
-            scan.setCaching(1);
-            scan.setLimit(1);
-            try (ResultScanner scanner = table.getScanner(scan)) {
-                Result result = scanner.next();
-                return !result.isEmpty();
-            }
+        Scan scan = new Scan();
+        scan.setCaching(1);
+        scan.setLimit(1);
+        try (Table table = connection.getTable(TableName.valueOf(databaseName, 
tableName));
+                ResultScanner scanner = table.getScanner(scan)) {
+            Result result = scanner.next();
+            return result != null && !result.isEmpty();
         } catch (IOException e) {
             throw new HbaseConnectorException(
                     HbaseConnectorErrorCode.TABLE_QUERY_EXCEPTION,
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClientTest.java
 
b/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClientTest.java
new file mode 100644
index 0000000000..70f0fa3251
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClientTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.client;
+
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.BufferedMutatorParams;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Constructor;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+
+class HbaseClientTest {
+
+    @Test
+    void testIsExistsDataReturnsFalseWhenScannerNextReturnsNull() throws 
Exception {
+        Connection connection = Mockito.mock(Connection.class);
+        Table table = Mockito.mock(Table.class);
+        ResultScanner scanner = Mockito.mock(ResultScanner.class);
+        
Mockito.when(connection.getTable(any(TableName.class))).thenReturn(table);
+        Mockito.when(table.getScanner(any(Scan.class))).thenReturn(scanner);
+        Mockito.when(scanner.next()).thenReturn(null);
+
+        HbaseClient client = newHbaseClient(connection);
+
+        assertFalse(client.isExistsData("ns", "tbl"));
+    }
+
+    @Test
+    void testIsExistsDataReturnsTrueWhenScannerHasResult() throws Exception {
+        Connection connection = Mockito.mock(Connection.class);
+        Table table = Mockito.mock(Table.class);
+        ResultScanner scanner = Mockito.mock(ResultScanner.class);
+        Result result = Mockito.mock(Result.class);
+        Mockito.when(result.isEmpty()).thenReturn(false);
+        
Mockito.when(connection.getTable(any(TableName.class))).thenReturn(table);
+        Mockito.when(table.getScanner(any(Scan.class))).thenReturn(scanner);
+        Mockito.when(scanner.next()).thenReturn(result);
+
+        HbaseClient client = newHbaseClient(connection);
+
+        assertTrue(client.isExistsData("ns", "tbl"));
+    }
+
+    private HbaseClient newHbaseClient(Connection connection) throws Exception 
{
+        HbaseClient.hbaseConfiguration = HBaseConfiguration.create();
+        
Mockito.when(connection.getAdmin()).thenReturn(Mockito.mock(Admin.class));
+        
Mockito.when(connection.getBufferedMutator(any(BufferedMutatorParams.class)))
+                .thenReturn(Mockito.mock(BufferedMutator.class));
+        HbaseParameters hbaseParameters = Mockito.mock(HbaseParameters.class);
+        Mockito.when(hbaseParameters.getNamespace()).thenReturn("ns");
+        Mockito.when(hbaseParameters.getTable()).thenReturn("tbl");
+        Mockito.when(hbaseParameters.getWriteBufferSize()).thenReturn(1);
+
+        Constructor<HbaseClient> constructor =
+                HbaseClient.class.getDeclaredConstructor(Connection.class, 
HbaseParameters.class);
+        constructor.setAccessible(true);
+        return constructor.newInstance(connection, hbaseParameters);
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
index b511cde2a1..ff3e7476d6 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
@@ -166,6 +166,17 @@ public class HbaseIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(1, execResult.getExitCode());
     }
 
+    @TestTemplate
+    public void testHbaseSinkWithErrorWhenDataExistsOnEmptyTable(TestContainer 
container)
+            throws IOException, InterruptedException {
+        deleteData(table);
+        Assertions.assertEquals(0, countData(table));
+        Container.ExecResult execResult =
+                
container.executeJob("/fake_to_hbase_with_error_when_data_exists.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Assertions.assertEquals(5, countData(table));
+    }
+
     @TestTemplate
     public void testHbaseSinkWithRecreateSchema(TestContainer container)
             throws IOException, InterruptedException {

Reply via email to