This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new dc5e92341 [FLINK-38160][starrocks] Add support for BINARY and 
VARBINARY types in StarRocks connector (#4303)
dc5e92341 is described below

commit dc5e92341d85e4922181c9466d1948c37c97bc5c
Author: Jia Fan <[email protected]>
AuthorDate: Mon Mar 9 10:49:37 2026 +0800

    [FLINK-38160][starrocks] Add support for BINARY and VARBINARY types in 
StarRocks connector (#4303)
---
 .../connectors/pipeline-connectors/starrocks.md    | 15 +++++
 .../connectors/pipeline-connectors/starrocks.md    | 15 +++++
 .../connectors/starrocks/sink/StarRocksUtils.java  | 26 ++++++++
 .../starrocks/sink/CdcDataTypeTransformerTest.java | 37 +++++++++++
 .../sink/EventRecordSerializationSchemaTest.java   | 74 ++++++++++++++++++++++
 .../sink/StarRocksMetadataApplierITCase.java       | 13 ++--
 .../starrocks/sink/StarRocksUtilsTest.java         | 74 ++++++++++++++++++----
 7 files changed, 237 insertions(+), 17 deletions(-)

diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md 
b/docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md
index 98f79f86a..d967993dd 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md
@@ -337,6 +337,21 @@ pipeline:
       <td>CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。根据 UTF-8 编码,一个中文字符占用三个字节,因此 CDC 
中的长度对应到 StarRocks
           中为 n * 3。</td>
     </tr>
+    <tr>
+      <td>BINARY(n)</td>
+      <td>VARBINARY(min(n,1048576))</td>
+      <td>长度上限为 1048576。</td>
+    </tr>
+    <tr>
+      <td>VARBINARY(n)</td>
+      <td>VARBINARY(min(n,1048576))</td>
+      <td>长度上限为 1048576。</td>
+    </tr>
+    <tr>
+      <td>BYTES</td>
+      <td>VARBINARY(1048576)</td>
+      <td>BYTES 映射为最大长度为 1048576 的 VARBINARY。</td>
+    </tr>
     </tbody>
 </table>
 </div>
diff --git a/docs/content/docs/connectors/pipeline-connectors/starrocks.md 
b/docs/content/docs/connectors/pipeline-connectors/starrocks.md
index 67fbbf046..a1ff9d463 100644
--- a/docs/content/docs/connectors/pipeline-connectors/starrocks.md
+++ b/docs/content/docs/connectors/pipeline-connectors/starrocks.md
@@ -348,6 +348,21 @@ pipeline:
       <td>CDC defines the length by characters, and StarRocks defines it by 
bytes. According to UTF-8, one Chinese 
         character is equal to three bytes, so the length for StarRocks is n * 
3.</td>
     </tr>
+    <tr>
+      <td>BINARY(n)</td>
+      <td>VARBINARY(min(n,1048576))</td>
+      <td>The length is capped to 1048576.</td>
+    </tr>
+    <tr>
+      <td>VARBINARY(n)</td>
+      <td>VARBINARY(min(n,1048576))</td>
+      <td>The length is capped to 1048576.</td>
+    </tr>
+    <tr>
+      <td>BYTES</td>
+      <td>VARBINARY(1048576)</td>
+      <td>BYTES is mapped to VARBINARY with max length 1048576.</td>
+    </tr>
     </tbody>
 </table>
 </div>
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
index 22237a5f3..f72f5ddc5 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
@@ -22,6 +22,7 @@ import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.types.BigIntType;
+import org.apache.flink.cdc.common.types.BinaryType;
 import org.apache.flink.cdc.common.types.BooleanType;
 import org.apache.flink.cdc.common.types.CharType;
 import org.apache.flink.cdc.common.types.DataType;
@@ -36,6 +37,7 @@ import org.apache.flink.cdc.common.types.SmallIntType;
 import org.apache.flink.cdc.common.types.TimeType;
 import org.apache.flink.cdc.common.types.TimestampType;
 import org.apache.flink.cdc.common.types.TinyIntType;
+import org.apache.flink.cdc.common.types.VarBinaryType;
 import org.apache.flink.cdc.common.types.VarCharType;
 
 import com.starrocks.connector.flink.catalog.StarRocksColumn;
@@ -215,6 +217,10 @@ public class StarRocksUtils {
                 fieldGetter =
                         record -> 
record.getDate(fieldPos).toLocalDate().format(DATE_FORMATTER);
                 break;
+            case BINARY:
+            case VARBINARY:
+                fieldGetter = record -> record.getBinary(fieldPos);
+                break;
             case TIME_WITHOUT_TIME_ZONE:
                 fieldGetter =
                         record ->
@@ -273,6 +279,7 @@ public class StarRocksUtils {
     public static final String STRING = "STRING";
     public static final String DATE = "DATE";
     public static final String DATETIME = "DATETIME";
+    public static final String VARBINARY = "VARBINARY";
     public static final String JSON = "JSON";
 
     /** Max size of char type of StarRocks. */
@@ -281,6 +288,9 @@ public class StarRocksUtils {
     /** Max size of varchar type of StarRocks. */
     public static final int MAX_VARCHAR_SIZE = 1048576;
 
+    /** Max size of varbinary type of StarRocks. */
+    public static final int MAX_VARBINARY_SIZE = 1048576;
+
     /** Transforms CDC {@link DataType} to StarRocks data type. */
     public static class CdcDataTypeTransformer
             extends DataTypeDefaultVisitor<StarRocksColumn.Builder> {
@@ -406,6 +416,22 @@ public class StarRocksUtils {
             return builder;
         }
 
+        @Override
+        public StarRocksColumn.Builder visit(BinaryType binaryType) {
+            builder.setDataType(VARBINARY);
+            builder.setNullable(binaryType.isNullable());
+            builder.setColumnSize(Math.min(binaryType.getLength(), 
MAX_VARBINARY_SIZE));
+            return builder;
+        }
+
+        @Override
+        public StarRocksColumn.Builder visit(VarBinaryType varBinaryType) {
+            builder.setDataType(VARBINARY);
+            builder.setNullable(varBinaryType.isNullable());
+            builder.setColumnSize(Math.min(varBinaryType.getLength(), 
MAX_VARBINARY_SIZE));
+            return builder;
+        }
+
         @Override
         public StarRocksColumn.Builder visit(DateType dateType) {
             builder.setDataType(DATE);
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/CdcDataTypeTransformerTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/CdcDataTypeTransformerTest.java
index 4bfc6685f..8fe51a1c4 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/CdcDataTypeTransformerTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/CdcDataTypeTransformerTest.java
@@ -17,8 +17,11 @@
 
 package org.apache.flink.cdc.connectors.starrocks.sink;
 
+import org.apache.flink.cdc.common.types.BinaryType;
 import org.apache.flink.cdc.common.types.CharType;
+import org.apache.flink.cdc.common.types.DataTypes;
 import org.apache.flink.cdc.common.types.DecimalType;
+import org.apache.flink.cdc.common.types.VarBinaryType;
 import org.apache.flink.cdc.common.types.VarCharType;
 
 import com.starrocks.connector.flink.catalog.StarRocksColumn;
@@ -146,4 +149,38 @@ class CdcDataTypeTransformerTest {
                 .hasValue(StarRocksUtils.MAX_VARCHAR_SIZE);
         Assertions.assertThat(largeLengthColumn.isNullable()).isTrue();
     }
+
+    @Test
+    void testBinaryType() {
+        StarRocksColumn.Builder builder =
+                new 
StarRocksColumn.Builder().setColumnName("binary_col").setOrdinalPosition(0);
+        new BinaryType(17).accept(new 
StarRocksUtils.CdcDataTypeTransformer(false, builder));
+        StarRocksColumn column = builder.build();
+        
Assertions.assertThat(column.getDataType()).isEqualTo(StarRocksUtils.VARBINARY);
+        Assertions.assertThat(column.getColumnSize()).hasValue(17);
+        Assertions.assertThat(column.isNullable()).isTrue();
+    }
+
+    @Test
+    void testVarBinaryType() {
+        StarRocksColumn.Builder builder =
+                new 
StarRocksColumn.Builder().setColumnName("varbinary_col").setOrdinalPosition(0);
+        new VarBinaryType(255).accept(new 
StarRocksUtils.CdcDataTypeTransformer(false, builder));
+        StarRocksColumn column = builder.build();
+        
Assertions.assertThat(column.getDataType()).isEqualTo(StarRocksUtils.VARBINARY);
+        Assertions.assertThat(column.getColumnSize()).hasValue(255);
+        Assertions.assertThat(column.isNullable()).isTrue();
+    }
+
+    @Test
+    void testBytesType() {
+        // BYTES is VarBinaryType with MAX_LENGTH, should be capped to 
MAX_VARBINARY_SIZE
+        StarRocksColumn.Builder builder =
+                new 
StarRocksColumn.Builder().setColumnName("bytes_col").setOrdinalPosition(0);
+        DataTypes.BYTES().accept(new 
StarRocksUtils.CdcDataTypeTransformer(false, builder));
+        StarRocksColumn column = builder.build();
+        
Assertions.assertThat(column.getDataType()).isEqualTo(StarRocksUtils.VARBINARY);
+        
Assertions.assertThat(column.getColumnSize()).hasValue(StarRocksUtils.MAX_VARBINARY_SIZE);
+        Assertions.assertThat(column.isNullable()).isTrue();
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java
index a9c6f240e..6d53abeb4 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.cdc.common.event.DropColumnEvent;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.BinaryType;
 import org.apache.flink.cdc.common.types.BooleanType;
 import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.DateType;
@@ -48,6 +49,7 @@ import 
org.apache.flink.cdc.common.types.LocalZonedTimestampType;
 import org.apache.flink.cdc.common.types.SmallIntType;
 import org.apache.flink.cdc.common.types.TimeType;
 import org.apache.flink.cdc.common.types.TimestampType;
+import org.apache.flink.cdc.common.types.VarBinaryType;
 import org.apache.flink.cdc.common.types.VarCharType;
 import org.apache.flink.cdc.common.utils.SchemaUtils;
 import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
@@ -77,6 +79,7 @@ import java.time.LocalTime;
 import java.time.ZoneId;
 import java.time.ZoneOffset;
 import java.util.Arrays;
+import java.util.Base64;
 import java.util.HashMap;
 import java.util.Objects;
 import java.util.OptionalLong;
@@ -484,6 +487,77 @@ class EventRecordSerializationSchemaTest {
                 tableId, 
"{\"id\":1,\"not_null_time\":\"12:00:00\",\"__op\":0}", result);
     }
 
+    @Test
+    void testBinaryTypeSerialization() throws Exception {
+        TableId tableId = TableId.parse("test.binary_table");
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", new IntType())
+                        .physicalColumn("bin_col", new BinaryType(10))
+                        .physicalColumn("varbin_col", new VarBinaryType(255))
+                        .primaryKey("id")
+                        .build();
+
+        CreateTableEvent createTableEvent = new CreateTableEvent(tableId, 
schema);
+        Assertions.assertThat(serializer.serialize(createTableEvent)).isNull();
+
+        BinaryRecordDataGenerator generator =
+                new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+        byte[] binData = new byte[] {1, 2, 3, 4, 5};
+        byte[] varBinData = new byte[] {0x0A, 0x0B, 0x0C};
+
+        DataChangeEvent insertEvent =
+                DataChangeEvent.insertEvent(
+                        tableId, generator.generate(new Object[] {1, binData, 
varBinData}));
+
+        StarRocksRowData result = serializer.serialize(insertEvent);
+        Assertions.assertThat(result).isNotNull();
+        
Assertions.assertThat(result.getDatabase()).isEqualTo(tableId.getSchemaName());
+        
Assertions.assertThat(result.getTable()).isEqualTo(tableId.getTableName());
+
+        // Binary data is serialized as Base64-encoded strings in JSON by 
Jackson
+        String expectedBin = Base64.getEncoder().encodeToString(binData);
+        String expectedVarBin = Base64.getEncoder().encodeToString(varBinData);
+        String expectedJson =
+                String.format(
+                        
"{\"id\":1,\"bin_col\":\"%s\",\"varbin_col\":\"%s\",\"__op\":0}",
+                        expectedBin, expectedVarBin);
+
+        SortedMap<String, Object> expectMap =
+                objectMapper.readValue(
+                        expectedJson, new TypeReference<TreeMap<String, 
Object>>() {});
+        SortedMap<String, Object> actualMap =
+                objectMapper.readValue(
+                        result.getRow(), new TypeReference<TreeMap<String, 
Object>>() {});
+        Assertions.assertThat(actualMap).isEqualTo(expectMap);
+    }
+
+    @Test
+    void testBinaryTypeWithNullValues() throws Exception {
+        TableId tableId = TableId.parse("test.binary_null_table");
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", new IntType())
+                        .physicalColumn("nullable_bin", new VarBinaryType(100))
+                        .primaryKey("id")
+                        .build();
+
+        CreateTableEvent createTableEvent = new CreateTableEvent(tableId, 
schema);
+        Assertions.assertThat(serializer.serialize(createTableEvent)).isNull();
+
+        BinaryRecordDataGenerator generator =
+                new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+        DataChangeEvent insertEvent =
+                DataChangeEvent.insertEvent(tableId, generator.generate(new 
Object[] {1, null}));
+
+        StarRocksRowData result = serializer.serialize(insertEvent);
+        Assertions.assertThat(result).isNotNull();
+
+        verifySerializeResult(tableId, "{\"id\":1,\"__op\":0}", result);
+    }
+
     private void verifySerializeResult(
             TableId expectTable, String expectRow, StarRocksRowData 
actualRowData)
             throws Exception {
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
index 6501ba03c..9a6b3ed47 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
@@ -197,11 +197,11 @@ class StarRocksMetadataApplierITCase extends 
StarRocksSinkTestBase {
         Schema schema =
                 Schema.newBuilder()
                         .column(new PhysicalColumn("id", 
DataTypes.INT().notNull(), "ID"))
-                        // StarRocks sink doesn't support BINARY and BYTES 
type yet.
-                        // .column(new PhysicalColumn("binary", 
DataTypes.BINARY(17), "Binary"))
-                        // .column(new PhysicalColumn("varbinary", 
DataTypes.VARBINARY(17), "Var
-                        // Binary"))
-                        // .column(new PhysicalColumn("bytes", 
DataTypes.BYTES(), "Bytes"))
+                        .column(new PhysicalColumn("binary", 
DataTypes.BINARY(17), "Binary"))
+                        .column(
+                                new PhysicalColumn(
+                                        "varbinary", DataTypes.VARBINARY(17), 
"Var Binary"))
+                        .column(new PhysicalColumn("bytes", DataTypes.BYTES(), 
"Bytes"))
                         .column(new PhysicalColumn("boolean", 
DataTypes.BOOLEAN(), "Boolean"))
                         .column(new PhysicalColumn("int", DataTypes.INT(), 
"Int"))
                         .column(new PhysicalColumn("tinyint", 
DataTypes.TINYINT(), "Tiny Int"))
@@ -245,6 +245,9 @@ class StarRocksMetadataApplierITCase extends 
StarRocksSinkTestBase {
         List<String> expected =
                 Arrays.asList(
                         "id | int | NO | true | null",
+                        "binary | varbinary | YES | false | null",
+                        "varbinary | varbinary | YES | false | null",
+                        "bytes | varbinary | YES | false | null",
                         "boolean | boolean | YES | false | null",
                         "int | int | YES | false | null",
                         "tinyint | tinyint | YES | false | null",
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtilsTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtilsTest.java
index 3e93c9fd5..cc99b3bc4 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtilsTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtilsTest.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.cdc.common.data.binary.BinaryStringData;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.types.BigIntType;
+import org.apache.flink.cdc.common.types.BinaryType;
 import org.apache.flink.cdc.common.types.BooleanType;
 import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.DataTypes;
@@ -37,6 +38,7 @@ import 
org.apache.flink.cdc.common.types.LocalZonedTimestampType;
 import org.apache.flink.cdc.common.types.SmallIntType;
 import org.apache.flink.cdc.common.types.TimestampType;
 import org.apache.flink.cdc.common.types.TinyIntType;
+import org.apache.flink.cdc.common.types.VarBinaryType;
 import org.apache.flink.cdc.common.types.VarCharType;
 import org.apache.flink.cdc.common.types.ZonedTimestampType;
 import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
@@ -300,13 +302,36 @@ class StarRocksUtilsTest {
     }
 
     @Test
-    void testToStarRocksDataTypeUnsupported() {
+    void testToStarRocksDataTypeBinary() {
         StarRocksColumn.Builder builder =
                 new 
StarRocksColumn.Builder().setColumnName("col").setOrdinalPosition(0);
-        assertThatThrownBy(
-                        () -> 
StarRocksUtils.toStarRocksDataType(DataTypes.BYTES(), false, builder))
-                .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining("Unsupported CDC data type");
+        StarRocksUtils.toStarRocksDataType(new BinaryType(17), false, builder);
+        StarRocksColumn column = builder.build();
+        assertThat(column.getDataType()).isEqualTo(StarRocksUtils.VARBINARY);
+        assertThat(column.getColumnSize()).hasValue(17);
+        assertThat(column.isNullable()).isTrue();
+    }
+
+    @Test
+    void testToStarRocksDataTypeVarBinary() {
+        StarRocksColumn.Builder builder =
+                new 
StarRocksColumn.Builder().setColumnName("col").setOrdinalPosition(0);
+        StarRocksUtils.toStarRocksDataType(new VarBinaryType(255), false, 
builder);
+        StarRocksColumn column = builder.build();
+        assertThat(column.getDataType()).isEqualTo(StarRocksUtils.VARBINARY);
+        assertThat(column.getColumnSize()).hasValue(255);
+        assertThat(column.isNullable()).isTrue();
+    }
+
+    @Test
+    void testToStarRocksDataTypeBytes() {
+        StarRocksColumn.Builder builder =
+                new 
StarRocksColumn.Builder().setColumnName("col").setOrdinalPosition(0);
+        StarRocksUtils.toStarRocksDataType(DataTypes.BYTES(), false, builder);
+        StarRocksColumn column = builder.build();
+        assertThat(column.getDataType()).isEqualTo(StarRocksUtils.VARBINARY);
+        
assertThat(column.getColumnSize()).hasValue(StarRocksUtils.MAX_VARBINARY_SIZE);
+        assertThat(column.isNullable()).isTrue();
     }
 
     private void assertStarRocksDataType(DataType cdcType, String 
expectedStarRocksType) {
@@ -418,12 +443,37 @@ class StarRocksUtilsTest {
     }
 
     @Test
-    void testCreateFieldGetterUnsupportedType() {
-        assertThatThrownBy(
-                        () ->
-                                StarRocksUtils.createFieldGetter(
-                                        DataTypes.BYTES(), 0, 
ZoneId.of("UTC")))
-                .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining("Don't support data type");
+    void testCreateFieldGetterBinary() {
+        RecordData.FieldGetter getter =
+                StarRocksUtils.createFieldGetter(new BinaryType(10), 0, 
ZoneId.of("UTC"));
+
+        BinaryRecordDataGenerator generator =
+                new BinaryRecordDataGenerator(new DataType[] {new 
BinaryType(10)});
+        byte[] expected = new byte[] {1, 2, 3, 4, 5};
+        BinaryRecordData record = generator.generate(new Object[] {expected});
+        assertThat(getter.getFieldOrNull(record)).isEqualTo(expected);
+    }
+
+    @Test
+    void testCreateFieldGetterVarBinary() {
+        RecordData.FieldGetter getter =
+                StarRocksUtils.createFieldGetter(new VarBinaryType(255), 0, 
ZoneId.of("UTC"));
+
+        BinaryRecordDataGenerator generator =
+                new BinaryRecordDataGenerator(new DataType[] {new 
VarBinaryType(255)});
+        byte[] expected = new byte[] {0x0A, 0x0B, 0x0C};
+        BinaryRecordData record = generator.generate(new Object[] {expected});
+        assertThat(getter.getFieldOrNull(record)).isEqualTo(expected);
+    }
+
+    @Test
+    void testCreateFieldGetterBinaryNullable() {
+        RecordData.FieldGetter getter =
+                StarRocksUtils.createFieldGetter(DataTypes.BYTES(), 0, 
ZoneId.of("UTC"));
+
+        BinaryRecordDataGenerator generator =
+                new BinaryRecordDataGenerator(new DataType[] 
{DataTypes.BYTES()});
+        BinaryRecordData record = generator.generate(new Object[] {null});
+        assertThat(getter.getFieldOrNull(record)).isNull();
     }
 }

Reply via email to