This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 84b039d4fa [Fix][Connector-V2][Hbase] Fix HBase sink binary rowkey 
handling (#10300)
84b039d4fa is described below

commit 84b039d4fac3a36ab5c817905be86e83429e32f7
Author: yzeng1618 <[email protected]>
AuthorDate: Fri Jan 9 20:48:20 2026 +0800

    [Fix][Connector-V2][Hbase] Fix HBase sink binary rowkey handling (#10300)
---
 .../seatunnel/hbase/sink/HbaseSinkWriter.java      | 47 +++++++++++++--
 .../seatunnel/hbase/sink/HbaseSinkWriterTest.java  | 70 ++++++++++++++++++++++
 .../seatunnel/e2e/connector/hbase/HbaseIT.java     | 30 ++++++++++
 .../resources/fake-to-hbase-binary-rowkey.conf     | 48 +++++++++++++++
 4 files changed, 191 insertions(+), 4 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
index 73ee19f936..99b3cdfba5 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
@@ -22,6 +22,7 @@ import 
org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.connectors.seatunnel.hbase.client.HbaseClient;
 import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
@@ -34,6 +35,7 @@ import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.List;
@@ -144,11 +146,48 @@ public class HbaseSinkWriter
     }
 
     private byte[] getRowkeyFromRow(SeaTunnelRow row) {
-        String[] rowkeyValues = new String[rowkeyColumnIndexes.size()];
-        for (int i = 0; i < rowkeyColumnIndexes.size(); i++) {
-            rowkeyValues[i] = 
row.getField(rowkeyColumnIndexes.get(i)).toString();
+        int rowkeySize = rowkeyColumnIndexes.size();
+        int firstRowkeyIndex = rowkeyColumnIndexes.get(0);
+        if (rowkeySize == 1 && isBinaryRowkeyColumn(firstRowkeyIndex)) {
+            return (byte[]) row.getField(firstRowkeyIndex);
         }
-        return Bytes.toBytes(String.join(hbaseParameters.getRowkeyDelimiter(), 
rowkeyValues));
+        if (!hasBinaryRowkeyColumn()) {
+            String[] rowkeyValues = new String[rowkeySize];
+            for (int i = 0; i < rowkeySize; i++) {
+                rowkeyValues[i] = 
row.getField(rowkeyColumnIndexes.get(i)).toString();
+            }
+            return 
Bytes.toBytes(String.join(hbaseParameters.getRowkeyDelimiter(), rowkeyValues));
+        }
+        byte[] delimiter = Bytes.toBytes(hbaseParameters.getRowkeyDelimiter());
+        ByteArrayOutputStream output = new ByteArrayOutputStream();
+        for (int i = 0; i < rowkeySize; i++) {
+            if (i > 0 && delimiter.length > 0) {
+                output.write(delimiter, 0, delimiter.length);
+            }
+            byte[] bytes = rowkeyFieldToBytes(rowkeyColumnIndexes.get(i), row);
+            output.write(bytes, 0, bytes.length);
+        }
+        return output.toByteArray();
+    }
+
+    private boolean hasBinaryRowkeyColumn() {
+        for (Integer index : rowkeyColumnIndexes) {
+            if (isBinaryRowkeyColumn(index)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean isBinaryRowkeyColumn(int index) {
+        return seaTunnelRowType.getFieldType(index).getSqlType() == 
SqlType.BYTES;
+    }
+
+    private byte[] rowkeyFieldToBytes(int index, SeaTunnelRow row) {
+        if (isBinaryRowkeyColumn(index)) {
+            return (byte[]) row.getField(index);
+        }
+        return Bytes.toBytes(row.getField(index).toString());
     }
 
     private byte[] convertColumnToBytes(SeaTunnelRow row, int index) {
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriterTest.java
 
b/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriterTest.java
new file mode 100644
index 0000000000..53fd0b9df0
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriterTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.sink;
+
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.hbase.client.HbaseClient;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+
+import org.apache.hadoop.hbase.client.Put;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+
+class HbaseSinkWriterTest {
+
+    @Test
+    void testBinaryRowkeyUsesRawBytes() throws Exception {
+        HbaseParameters hbaseParameters =
+                HbaseParameters.builder()
+                        .familyNames(Collections.singletonMap("all_columns", 
"info"))
+                        .build();
+        SeaTunnelRowType rowType =
+                new SeaTunnelRowType(
+                        new String[] {"rowkey"},
+                        new SeaTunnelDataType[] 
{PrimitiveByteArrayType.INSTANCE});
+        byte[] rowkey = new byte[] {0x00, 0x01, 0x02, 0x03};
+        SeaTunnelRow row = new SeaTunnelRow(new Object[] {rowkey});
+        HbaseClient hbaseClient = Mockito.mock(HbaseClient.class);
+
+        try (MockedStatic<HbaseClient> mockedStatic = 
Mockito.mockStatic(HbaseClient.class)) {
+            mockedStatic
+                    .when(() -> 
HbaseClient.createInstance(Mockito.any(HbaseParameters.class)))
+                    .thenReturn(hbaseClient);
+
+            HbaseSinkWriter writer =
+                    new HbaseSinkWriter(rowType, hbaseParameters, 
Arrays.asList(0), -1);
+            writer.write(row);
+        }
+
+        ArgumentCaptor<Put> putCaptor = ArgumentCaptor.forClass(Put.class);
+        Mockito.verify(hbaseClient).mutate(putCaptor.capture());
+        assertArrayEquals(rowkey, putCaptor.getValue().getRow());
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
index c874c36c3f..c75b2377d5 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
@@ -59,8 +59,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.UUID;
 
 @Slf4j
@@ -78,6 +80,8 @@ public class HbaseIT extends TestSuiteBase implements 
TestResource {
 
     private static final String MULTI_TABLE_TWO_NAME = "hbase_sink_2";
 
+    private static final String BINARY_ROWKEY_TABLE_NAME = 
"seatunnel_test_binary_rowkey";
+
     private static final String FAMILY_NAME = "info";
 
     private Connection hbaseConnection;
@@ -86,6 +90,7 @@ public class HbaseIT extends TestSuiteBase implements 
TestResource {
 
     private TableName table;
     private TableName tableAssign;
+    private TableName binaryRowkeyTable;
 
     private HbaseCluster hbaseCluster;
 
@@ -104,6 +109,9 @@ public class HbaseIT extends TestSuiteBase implements 
TestResource {
         hbaseCluster.createTable(ASSIGN_CF_TABLE_NAME, Arrays.asList("cf1", 
"cf2"));
         table = TableName.valueOf(TABLE_NAME);
         tableAssign = TableName.valueOf(ASSIGN_CF_TABLE_NAME);
+        // Create table for hbase binary rowkey sink test
+        hbaseCluster.createTable(BINARY_ROWKEY_TABLE_NAME, 
Arrays.asList(FAMILY_NAME));
+        binaryRowkeyTable = TableName.valueOf(BINARY_ROWKEY_TABLE_NAME);
 
         // Create table for hbase multi-table sink test
         hbaseCluster.createTable(MULTI_TABLE_ONE_NAME, 
Arrays.asList(FAMILY_NAME));
@@ -256,6 +264,28 @@ public class HbaseIT extends TestSuiteBase implements 
TestResource {
         scanner.close();
     }
 
+    @TestTemplate
+    public void testHbaseSinkWithBinaryRowkey(TestContainer container)
+            throws IOException, InterruptedException {
+        deleteData(binaryRowkeyTable);
+        Container.ExecResult execResult = 
container.executeJob("/fake-to-hbase-binary-rowkey.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        ArrayList<Result> results = readData(binaryRowkeyTable);
+        Assertions.assertEquals(3, results.size());
+        Set<String> actualRowKeys = new HashSet<>();
+        for (Result result : results) {
+            actualRowKeys.add(Bytes.toStringBinary(result.getRow()));
+        }
+        Set<String> expectedRowKeys =
+                new HashSet<>(
+                        Arrays.asList(
+                                Bytes.toStringBinary(new byte[] {0x00, 0x01, 
0x02, 0x03}),
+                                Bytes.toStringBinary(
+                                        new byte[] {(byte) 0xFF, (byte) 0xFE, 
0x0A, 0x0B}),
+                                Bytes.toStringBinary(new byte[] {0x10, 0x20, 
0x30, 0x40, 0x50})));
+        Assertions.assertEquals(expectedRowKeys, actualRowKeys);
+    }
+
     @TestTemplate
     public void testHbaseSinkAssignCfSink(TestContainer container)
             throws IOException, InterruptedException {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase-binary-rowkey.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase-binary-rowkey.conf
new file mode 100644
index 0000000000..f9ed268c16
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase-binary-rowkey.conf
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        rowkey = bytes
+        data = string
+      }
+    }
+    rows = [
+      {fields = ["AAECAw==", "binary_value_1"], kind = INSERT},
+      {fields = ["//4KCw==", "binary_value_2"], kind = INSERT},
+      {fields = ["ECAwQFA=", "binary_value_3"], kind = INSERT}
+    ]
+  }
+}
+
+sink {
+  Hbase {
+    zookeeper_quorum = "hbase_e2e:2181"
+    table = "seatunnel_test_binary_rowkey"
+    rowkey_column = ["rowkey"]
+    family_name {
+      all_columns = info
+    }
+  }
+}

Reply via email to