This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 84b039d4fa [Fix][Connector-V2][Hbase] Fix HBase sink binary rowkey
handling (#10300)
84b039d4fa is described below
commit 84b039d4fac3a36ab5c817905be86e83429e32f7
Author: yzeng1618 <[email protected]>
AuthorDate: Fri Jan 9 20:48:20 2026 +0800
[Fix][Connector-V2][Hbase] Fix HBase sink binary rowkey handling (#10300)
---
.../seatunnel/hbase/sink/HbaseSinkWriter.java | 47 +++++++++++++--
.../seatunnel/hbase/sink/HbaseSinkWriterTest.java | 70 ++++++++++++++++++++++
.../seatunnel/e2e/connector/hbase/HbaseIT.java | 30 ++++++++++
.../resources/fake-to-hbase-binary-rowkey.conf | 48 +++++++++++++++
4 files changed, 191 insertions(+), 4 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
index 73ee19f936..99b3cdfba5 100644
---
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
@@ -22,6 +22,7 @@ import
org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.hbase.client.HbaseClient;
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
@@ -34,6 +35,7 @@ import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
@@ -144,11 +146,48 @@ public class HbaseSinkWriter
}
private byte[] getRowkeyFromRow(SeaTunnelRow row) {
- String[] rowkeyValues = new String[rowkeyColumnIndexes.size()];
- for (int i = 0; i < rowkeyColumnIndexes.size(); i++) {
- rowkeyValues[i] =
row.getField(rowkeyColumnIndexes.get(i)).toString();
+ int rowkeySize = rowkeyColumnIndexes.size();
+ int firstRowkeyIndex = rowkeyColumnIndexes.get(0);
+ if (rowkeySize == 1 && isBinaryRowkeyColumn(firstRowkeyIndex)) {
+ return (byte[]) row.getField(firstRowkeyIndex);
}
- return Bytes.toBytes(String.join(hbaseParameters.getRowkeyDelimiter(),
rowkeyValues));
+ if (!hasBinaryRowkeyColumn()) {
+ String[] rowkeyValues = new String[rowkeySize];
+ for (int i = 0; i < rowkeySize; i++) {
+ rowkeyValues[i] =
row.getField(rowkeyColumnIndexes.get(i)).toString();
+ }
+ return
Bytes.toBytes(String.join(hbaseParameters.getRowkeyDelimiter(), rowkeyValues));
+ }
+ byte[] delimiter = Bytes.toBytes(hbaseParameters.getRowkeyDelimiter());
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ for (int i = 0; i < rowkeySize; i++) {
+ if (i > 0 && delimiter.length > 0) {
+ output.write(delimiter, 0, delimiter.length);
+ }
+ byte[] bytes = rowkeyFieldToBytes(rowkeyColumnIndexes.get(i), row);
+ output.write(bytes, 0, bytes.length);
+ }
+ return output.toByteArray();
+ }
+
+ private boolean hasBinaryRowkeyColumn() {
+ for (Integer index : rowkeyColumnIndexes) {
+ if (isBinaryRowkeyColumn(index)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean isBinaryRowkeyColumn(int index) {
+ return seaTunnelRowType.getFieldType(index).getSqlType() ==
SqlType.BYTES;
+ }
+
+ private byte[] rowkeyFieldToBytes(int index, SeaTunnelRow row) {
+ if (isBinaryRowkeyColumn(index)) {
+ return (byte[]) row.getField(index);
+ }
+ return Bytes.toBytes(row.getField(index).toString());
}
private byte[] convertColumnToBytes(SeaTunnelRow row, int index) {
diff --git
a/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriterTest.java
b/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriterTest.java
new file mode 100644
index 0000000000..53fd0b9df0
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriterTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.sink;
+
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.hbase.client.HbaseClient;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+
+import org.apache.hadoop.hbase.client.Put;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+
+class HbaseSinkWriterTest {
+
+ @Test
+ void testBinaryRowkeyUsesRawBytes() throws Exception {
+ HbaseParameters hbaseParameters =
+ HbaseParameters.builder()
+ .familyNames(Collections.singletonMap("all_columns",
"info"))
+ .build();
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(
+ new String[] {"rowkey"},
+ new SeaTunnelDataType[]
{PrimitiveByteArrayType.INSTANCE});
+ byte[] rowkey = new byte[] {0x00, 0x01, 0x02, 0x03};
+ SeaTunnelRow row = new SeaTunnelRow(new Object[] {rowkey});
+ HbaseClient hbaseClient = Mockito.mock(HbaseClient.class);
+
+ try (MockedStatic<HbaseClient> mockedStatic =
Mockito.mockStatic(HbaseClient.class)) {
+ mockedStatic
+ .when(() ->
HbaseClient.createInstance(Mockito.any(HbaseParameters.class)))
+ .thenReturn(hbaseClient);
+
+ HbaseSinkWriter writer =
+ new HbaseSinkWriter(rowType, hbaseParameters,
Arrays.asList(0), -1);
+ writer.write(row);
+ }
+
+ ArgumentCaptor<Put> putCaptor = ArgumentCaptor.forClass(Put.class);
+ Mockito.verify(hbaseClient).mutate(putCaptor.capture());
+ assertArrayEquals(rowkey, putCaptor.getValue().getRow());
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
index c874c36c3f..c75b2377d5 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
@@ -59,8 +59,10 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.UUID;
@Slf4j
@@ -78,6 +80,8 @@ public class HbaseIT extends TestSuiteBase implements
TestResource {
private static final String MULTI_TABLE_TWO_NAME = "hbase_sink_2";
+ private static final String BINARY_ROWKEY_TABLE_NAME =
"seatunnel_test_binary_rowkey";
+
private static final String FAMILY_NAME = "info";
private Connection hbaseConnection;
@@ -86,6 +90,7 @@ public class HbaseIT extends TestSuiteBase implements
TestResource {
private TableName table;
private TableName tableAssign;
+ private TableName binaryRowkeyTable;
private HbaseCluster hbaseCluster;
@@ -104,6 +109,9 @@ public class HbaseIT extends TestSuiteBase implements
TestResource {
hbaseCluster.createTable(ASSIGN_CF_TABLE_NAME, Arrays.asList("cf1",
"cf2"));
table = TableName.valueOf(TABLE_NAME);
tableAssign = TableName.valueOf(ASSIGN_CF_TABLE_NAME);
+ // Create table for hbase binary rowkey sink test
+ hbaseCluster.createTable(BINARY_ROWKEY_TABLE_NAME,
Arrays.asList(FAMILY_NAME));
+ binaryRowkeyTable = TableName.valueOf(BINARY_ROWKEY_TABLE_NAME);
// Create table for hbase multi-table sink test
hbaseCluster.createTable(MULTI_TABLE_ONE_NAME,
Arrays.asList(FAMILY_NAME));
@@ -256,6 +264,28 @@ public class HbaseIT extends TestSuiteBase implements
TestResource {
scanner.close();
}
+ @TestTemplate
+ public void testHbaseSinkWithBinaryRowkey(TestContainer container)
+ throws IOException, InterruptedException {
+ deleteData(binaryRowkeyTable);
+ Container.ExecResult execResult =
container.executeJob("/fake-to-hbase-binary-rowkey.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ ArrayList<Result> results = readData(binaryRowkeyTable);
+ Assertions.assertEquals(3, results.size());
+ Set<String> actualRowKeys = new HashSet<>();
+ for (Result result : results) {
+ actualRowKeys.add(Bytes.toStringBinary(result.getRow()));
+ }
+ Set<String> expectedRowKeys =
+ new HashSet<>(
+ Arrays.asList(
+ Bytes.toStringBinary(new byte[] {0x00, 0x01,
0x02, 0x03}),
+ Bytes.toStringBinary(
+ new byte[] {(byte) 0xFF, (byte) 0xFE,
0x0A, 0x0B}),
+ Bytes.toStringBinary(new byte[] {0x10, 0x20,
0x30, 0x40, 0x50})));
+ Assertions.assertEquals(expectedRowKeys, actualRowKeys);
+ }
+
@TestTemplate
public void testHbaseSinkAssignCfSink(TestContainer container)
throws IOException, InterruptedException {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase-binary-rowkey.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase-binary-rowkey.conf
new file mode 100644
index 0000000000..f9ed268c16
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase-binary-rowkey.conf
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ rowkey = bytes
+ data = string
+ }
+ }
+ rows = [
+ {fields = ["AAECAw==", "binary_value_1"], kind = INSERT},
+ {fields = ["//4KCw==", "binary_value_2"], kind = INSERT},
+ {fields = ["ECAwQFA=", "binary_value_3"], kind = INSERT}
+ ]
+ }
+}
+
+sink {
+ Hbase {
+ zookeeper_quorum = "hbase_e2e:2181"
+ table = "seatunnel_test_binary_rowkey"
+ rowkey_column = ["rowkey"]
+ family_name {
+ all_columns = info
+ }
+ }
+}