This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new dc5e92341 [FLINK-38160][starrocks] Add support for BINARY and
VARBINARY types in StarRocks connector (#4303)
dc5e92341 is described below
commit dc5e92341d85e4922181c9466d1948c37c97bc5c
Author: Jia Fan <[email protected]>
AuthorDate: Mon Mar 9 10:49:37 2026 +0800
[FLINK-38160][starrocks] Add support for BINARY and VARBINARY types in
StarRocks connector (#4303)
---
.../connectors/pipeline-connectors/starrocks.md | 15 +++++
.../connectors/pipeline-connectors/starrocks.md | 15 +++++
.../connectors/starrocks/sink/StarRocksUtils.java | 26 ++++++++
.../starrocks/sink/CdcDataTypeTransformerTest.java | 37 +++++++++++
.../sink/EventRecordSerializationSchemaTest.java | 74 ++++++++++++++++++++++
.../sink/StarRocksMetadataApplierITCase.java | 13 ++--
.../starrocks/sink/StarRocksUtilsTest.java | 74 ++++++++++++++++++----
7 files changed, 237 insertions(+), 17 deletions(-)
diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md
b/docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md
index 98f79f86a..d967993dd 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md
@@ -337,6 +337,21 @@ pipeline:
<td>CDC 中长度表示字符数,而 StarRocks 中长度表示字节数。根据 UTF-8 编码,一个中文字符占用三个字节,因此 CDC
中的长度对应到 StarRocks
中为 n * 3。</td>
</tr>
+ <tr>
+ <td>BINARY(n)</td>
+ <td>VARBINARY(min(n,1048576))</td>
+ <td>长度上限为 1048576。</td>
+ </tr>
+ <tr>
+ <td>VARBINARY(n)</td>
+ <td>VARBINARY(min(n,1048576))</td>
+ <td>长度上限为 1048576。</td>
+ </tr>
+ <tr>
+ <td>BYTES</td>
+ <td>VARBINARY(1048576)</td>
+ <td>BYTES 映射为最大长度为 1048576 的 VARBINARY。</td>
+ </tr>
</tbody>
</table>
</div>
diff --git a/docs/content/docs/connectors/pipeline-connectors/starrocks.md
b/docs/content/docs/connectors/pipeline-connectors/starrocks.md
index 67fbbf046..a1ff9d463 100644
--- a/docs/content/docs/connectors/pipeline-connectors/starrocks.md
+++ b/docs/content/docs/connectors/pipeline-connectors/starrocks.md
@@ -348,6 +348,21 @@ pipeline:
<td>CDC defines the length by characters, and StarRocks defines it by
bytes. According to UTF-8, one Chinese
character is equal to three bytes, so the length for StarRocks is n *
3.</td>
</tr>
+ <tr>
+ <td>BINARY(n)</td>
+ <td>VARBINARY(min(n,1048576))</td>
+ <td>The length is capped to 1048576.</td>
+ </tr>
+ <tr>
+ <td>VARBINARY(n)</td>
+ <td>VARBINARY(min(n,1048576))</td>
+ <td>The length is capped to 1048576.</td>
+ </tr>
+ <tr>
+ <td>BYTES</td>
+ <td>VARBINARY(1048576)</td>
+ <td>BYTES is mapped to VARBINARY with max length 1048576.</td>
+ </tr>
</tbody>
</table>
</div>
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
index 22237a5f3..f72f5ddc5 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
@@ -22,6 +22,7 @@ import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.BigIntType;
+import org.apache.flink.cdc.common.types.BinaryType;
import org.apache.flink.cdc.common.types.BooleanType;
import org.apache.flink.cdc.common.types.CharType;
import org.apache.flink.cdc.common.types.DataType;
@@ -36,6 +37,7 @@ import org.apache.flink.cdc.common.types.SmallIntType;
import org.apache.flink.cdc.common.types.TimeType;
import org.apache.flink.cdc.common.types.TimestampType;
import org.apache.flink.cdc.common.types.TinyIntType;
+import org.apache.flink.cdc.common.types.VarBinaryType;
import org.apache.flink.cdc.common.types.VarCharType;
import com.starrocks.connector.flink.catalog.StarRocksColumn;
@@ -215,6 +217,10 @@ public class StarRocksUtils {
fieldGetter =
record ->
record.getDate(fieldPos).toLocalDate().format(DATE_FORMATTER);
break;
+ case BINARY:
+ case VARBINARY:
+ fieldGetter = record -> record.getBinary(fieldPos);
+ break;
case TIME_WITHOUT_TIME_ZONE:
fieldGetter =
record ->
@@ -273,6 +279,7 @@ public class StarRocksUtils {
public static final String STRING = "STRING";
public static final String DATE = "DATE";
public static final String DATETIME = "DATETIME";
+ public static final String VARBINARY = "VARBINARY";
public static final String JSON = "JSON";
/** Max size of char type of StarRocks. */
@@ -281,6 +288,9 @@ public class StarRocksUtils {
/** Max size of varchar type of StarRocks. */
public static final int MAX_VARCHAR_SIZE = 1048576;
+ /** Max size of varbinary type of StarRocks. */
+ public static final int MAX_VARBINARY_SIZE = 1048576;
+
/** Transforms CDC {@link DataType} to StarRocks data type. */
public static class CdcDataTypeTransformer
extends DataTypeDefaultVisitor<StarRocksColumn.Builder> {
@@ -406,6 +416,22 @@ public class StarRocksUtils {
return builder;
}
+ @Override
+ public StarRocksColumn.Builder visit(BinaryType binaryType) {
+ builder.setDataType(VARBINARY);
+ builder.setNullable(binaryType.isNullable());
+ builder.setColumnSize(Math.min(binaryType.getLength(),
MAX_VARBINARY_SIZE));
+ return builder;
+ }
+
+ @Override
+ public StarRocksColumn.Builder visit(VarBinaryType varBinaryType) {
+ builder.setDataType(VARBINARY);
+ builder.setNullable(varBinaryType.isNullable());
+ builder.setColumnSize(Math.min(varBinaryType.getLength(),
MAX_VARBINARY_SIZE));
+ return builder;
+ }
+
@Override
public StarRocksColumn.Builder visit(DateType dateType) {
builder.setDataType(DATE);
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/CdcDataTypeTransformerTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/CdcDataTypeTransformerTest.java
index 4bfc6685f..8fe51a1c4 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/CdcDataTypeTransformerTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/CdcDataTypeTransformerTest.java
@@ -17,8 +17,11 @@
package org.apache.flink.cdc.connectors.starrocks.sink;
+import org.apache.flink.cdc.common.types.BinaryType;
import org.apache.flink.cdc.common.types.CharType;
+import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.DecimalType;
+import org.apache.flink.cdc.common.types.VarBinaryType;
import org.apache.flink.cdc.common.types.VarCharType;
import com.starrocks.connector.flink.catalog.StarRocksColumn;
@@ -146,4 +149,38 @@ class CdcDataTypeTransformerTest {
.hasValue(StarRocksUtils.MAX_VARCHAR_SIZE);
Assertions.assertThat(largeLengthColumn.isNullable()).isTrue();
}
+
+ @Test
+ void testBinaryType() {
+ StarRocksColumn.Builder builder =
+ new
StarRocksColumn.Builder().setColumnName("binary_col").setOrdinalPosition(0);
+ new BinaryType(17).accept(new
StarRocksUtils.CdcDataTypeTransformer(false, builder));
+ StarRocksColumn column = builder.build();
+
Assertions.assertThat(column.getDataType()).isEqualTo(StarRocksUtils.VARBINARY);
+ Assertions.assertThat(column.getColumnSize()).hasValue(17);
+ Assertions.assertThat(column.isNullable()).isTrue();
+ }
+
+ @Test
+ void testVarBinaryType() {
+ StarRocksColumn.Builder builder =
+ new
StarRocksColumn.Builder().setColumnName("varbinary_col").setOrdinalPosition(0);
+ new VarBinaryType(255).accept(new
StarRocksUtils.CdcDataTypeTransformer(false, builder));
+ StarRocksColumn column = builder.build();
+
Assertions.assertThat(column.getDataType()).isEqualTo(StarRocksUtils.VARBINARY);
+ Assertions.assertThat(column.getColumnSize()).hasValue(255);
+ Assertions.assertThat(column.isNullable()).isTrue();
+ }
+
+ @Test
+ void testBytesType() {
+ // BYTES is VarBinaryType with MAX_LENGTH, should be capped to
MAX_VARBINARY_SIZE
+ StarRocksColumn.Builder builder =
+ new
StarRocksColumn.Builder().setColumnName("bytes_col").setOrdinalPosition(0);
+ DataTypes.BYTES().accept(new
StarRocksUtils.CdcDataTypeTransformer(false, builder));
+ StarRocksColumn column = builder.build();
+
Assertions.assertThat(column.getDataType()).isEqualTo(StarRocksUtils.VARBINARY);
+
Assertions.assertThat(column.getColumnSize()).hasValue(StarRocksUtils.MAX_VARBINARY_SIZE);
+ Assertions.assertThat(column.isNullable()).isTrue();
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java
index a9c6f240e..6d53abeb4 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.BinaryType;
import org.apache.flink.cdc.common.types.BooleanType;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DateType;
@@ -48,6 +49,7 @@ import
org.apache.flink.cdc.common.types.LocalZonedTimestampType;
import org.apache.flink.cdc.common.types.SmallIntType;
import org.apache.flink.cdc.common.types.TimeType;
import org.apache.flink.cdc.common.types.TimestampType;
+import org.apache.flink.cdc.common.types.VarBinaryType;
import org.apache.flink.cdc.common.types.VarCharType;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
@@ -77,6 +79,7 @@ import java.time.LocalTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Arrays;
+import java.util.Base64;
import java.util.HashMap;
import java.util.Objects;
import java.util.OptionalLong;
@@ -484,6 +487,77 @@ class EventRecordSerializationSchemaTest {
tableId,
"{\"id\":1,\"not_null_time\":\"12:00:00\",\"__op\":0}", result);
}
+ @Test
+ void testBinaryTypeSerialization() throws Exception {
+ TableId tableId = TableId.parse("test.binary_table");
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", new IntType())
+ .physicalColumn("bin_col", new BinaryType(10))
+ .physicalColumn("varbin_col", new VarBinaryType(255))
+ .primaryKey("id")
+ .build();
+
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId,
schema);
+ Assertions.assertThat(serializer.serialize(createTableEvent)).isNull();
+
+ BinaryRecordDataGenerator generator =
+ new
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+ byte[] binData = new byte[] {1, 2, 3, 4, 5};
+ byte[] varBinData = new byte[] {0x0A, 0x0B, 0x0C};
+
+ DataChangeEvent insertEvent =
+ DataChangeEvent.insertEvent(
+ tableId, generator.generate(new Object[] {1, binData,
varBinData}));
+
+ StarRocksRowData result = serializer.serialize(insertEvent);
+ Assertions.assertThat(result).isNotNull();
+
Assertions.assertThat(result.getDatabase()).isEqualTo(tableId.getSchemaName());
+
Assertions.assertThat(result.getTable()).isEqualTo(tableId.getTableName());
+
+ // Binary data is serialized as Base64-encoded strings in JSON by
Jackson
+ String expectedBin = Base64.getEncoder().encodeToString(binData);
+ String expectedVarBin = Base64.getEncoder().encodeToString(varBinData);
+ String expectedJson =
+ String.format(
+
"{\"id\":1,\"bin_col\":\"%s\",\"varbin_col\":\"%s\",\"__op\":0}",
+ expectedBin, expectedVarBin);
+
+ SortedMap<String, Object> expectMap =
+ objectMapper.readValue(
+ expectedJson, new TypeReference<TreeMap<String,
Object>>() {});
+ SortedMap<String, Object> actualMap =
+ objectMapper.readValue(
+ result.getRow(), new TypeReference<TreeMap<String,
Object>>() {});
+ Assertions.assertThat(actualMap).isEqualTo(expectMap);
+ }
+
+ @Test
+ void testBinaryTypeWithNullValues() throws Exception {
+ TableId tableId = TableId.parse("test.binary_null_table");
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", new IntType())
+ .physicalColumn("nullable_bin", new VarBinaryType(100))
+ .primaryKey("id")
+ .build();
+
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId,
schema);
+ Assertions.assertThat(serializer.serialize(createTableEvent)).isNull();
+
+ BinaryRecordDataGenerator generator =
+ new
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+ DataChangeEvent insertEvent =
+ DataChangeEvent.insertEvent(tableId, generator.generate(new
Object[] {1, null}));
+
+ StarRocksRowData result = serializer.serialize(insertEvent);
+ Assertions.assertThat(result).isNotNull();
+
+ verifySerializeResult(tableId, "{\"id\":1,\"__op\":0}", result);
+ }
+
private void verifySerializeResult(
TableId expectTable, String expectRow, StarRocksRowData
actualRowData)
throws Exception {
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
index 6501ba03c..9a6b3ed47 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
@@ -197,11 +197,11 @@ class StarRocksMetadataApplierITCase extends
StarRocksSinkTestBase {
Schema schema =
Schema.newBuilder()
.column(new PhysicalColumn("id",
DataTypes.INT().notNull(), "ID"))
- // StarRocks sink doesn't support BINARY and BYTES
type yet.
- // .column(new PhysicalColumn("binary",
DataTypes.BINARY(17), "Binary"))
- // .column(new PhysicalColumn("varbinary",
DataTypes.VARBINARY(17), "Var
- // Binary"))
- // .column(new PhysicalColumn("bytes",
DataTypes.BYTES(), "Bytes"))
+ .column(new PhysicalColumn("binary",
DataTypes.BINARY(17), "Binary"))
+ .column(
+ new PhysicalColumn(
+ "varbinary", DataTypes.VARBINARY(17),
"Var Binary"))
+ .column(new PhysicalColumn("bytes", DataTypes.BYTES(),
"Bytes"))
.column(new PhysicalColumn("boolean",
DataTypes.BOOLEAN(), "Boolean"))
.column(new PhysicalColumn("int", DataTypes.INT(),
"Int"))
.column(new PhysicalColumn("tinyint",
DataTypes.TINYINT(), "Tiny Int"))
@@ -245,6 +245,9 @@ class StarRocksMetadataApplierITCase extends
StarRocksSinkTestBase {
List<String> expected =
Arrays.asList(
"id | int | NO | true | null",
+ "binary | varbinary | YES | false | null",
+ "varbinary | varbinary | YES | false | null",
+ "bytes | varbinary | YES | false | null",
"boolean | boolean | YES | false | null",
"int | int | YES | false | null",
"tinyint | tinyint | YES | false | null",
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtilsTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtilsTest.java
index 3e93c9fd5..cc99b3bc4 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtilsTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtilsTest.java
@@ -26,6 +26,7 @@ import
org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.BigIntType;
+import org.apache.flink.cdc.common.types.BinaryType;
import org.apache.flink.cdc.common.types.BooleanType;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
@@ -37,6 +38,7 @@ import
org.apache.flink.cdc.common.types.LocalZonedTimestampType;
import org.apache.flink.cdc.common.types.SmallIntType;
import org.apache.flink.cdc.common.types.TimestampType;
import org.apache.flink.cdc.common.types.TinyIntType;
+import org.apache.flink.cdc.common.types.VarBinaryType;
import org.apache.flink.cdc.common.types.VarCharType;
import org.apache.flink.cdc.common.types.ZonedTimestampType;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
@@ -300,13 +302,36 @@ class StarRocksUtilsTest {
}
@Test
- void testToStarRocksDataTypeUnsupported() {
+ void testToStarRocksDataTypeBinary() {
StarRocksColumn.Builder builder =
new
StarRocksColumn.Builder().setColumnName("col").setOrdinalPosition(0);
- assertThatThrownBy(
- () ->
StarRocksUtils.toStarRocksDataType(DataTypes.BYTES(), false, builder))
- .isInstanceOf(UnsupportedOperationException.class)
- .hasMessageContaining("Unsupported CDC data type");
+ StarRocksUtils.toStarRocksDataType(new BinaryType(17), false, builder);
+ StarRocksColumn column = builder.build();
+ assertThat(column.getDataType()).isEqualTo(StarRocksUtils.VARBINARY);
+ assertThat(column.getColumnSize()).hasValue(17);
+ assertThat(column.isNullable()).isTrue();
+ }
+
+ @Test
+ void testToStarRocksDataTypeVarBinary() {
+ StarRocksColumn.Builder builder =
+ new
StarRocksColumn.Builder().setColumnName("col").setOrdinalPosition(0);
+ StarRocksUtils.toStarRocksDataType(new VarBinaryType(255), false,
builder);
+ StarRocksColumn column = builder.build();
+ assertThat(column.getDataType()).isEqualTo(StarRocksUtils.VARBINARY);
+ assertThat(column.getColumnSize()).hasValue(255);
+ assertThat(column.isNullable()).isTrue();
+ }
+
+ @Test
+ void testToStarRocksDataTypeBytes() {
+ StarRocksColumn.Builder builder =
+ new
StarRocksColumn.Builder().setColumnName("col").setOrdinalPosition(0);
+ StarRocksUtils.toStarRocksDataType(DataTypes.BYTES(), false, builder);
+ StarRocksColumn column = builder.build();
+ assertThat(column.getDataType()).isEqualTo(StarRocksUtils.VARBINARY);
+
assertThat(column.getColumnSize()).hasValue(StarRocksUtils.MAX_VARBINARY_SIZE);
+ assertThat(column.isNullable()).isTrue();
}
private void assertStarRocksDataType(DataType cdcType, String
expectedStarRocksType) {
@@ -418,12 +443,37 @@ class StarRocksUtilsTest {
}
@Test
- void testCreateFieldGetterUnsupportedType() {
- assertThatThrownBy(
- () ->
- StarRocksUtils.createFieldGetter(
- DataTypes.BYTES(), 0,
ZoneId.of("UTC")))
- .isInstanceOf(UnsupportedOperationException.class)
- .hasMessageContaining("Don't support data type");
+ void testCreateFieldGetterBinary() {
+ RecordData.FieldGetter getter =
+ StarRocksUtils.createFieldGetter(new BinaryType(10), 0,
ZoneId.of("UTC"));
+
+ BinaryRecordDataGenerator generator =
+ new BinaryRecordDataGenerator(new DataType[] {new
BinaryType(10)});
+ byte[] expected = new byte[] {1, 2, 3, 4, 5};
+ BinaryRecordData record = generator.generate(new Object[] {expected});
+ assertThat(getter.getFieldOrNull(record)).isEqualTo(expected);
+ }
+
+ @Test
+ void testCreateFieldGetterVarBinary() {
+ RecordData.FieldGetter getter =
+ StarRocksUtils.createFieldGetter(new VarBinaryType(255), 0,
ZoneId.of("UTC"));
+
+ BinaryRecordDataGenerator generator =
+ new BinaryRecordDataGenerator(new DataType[] {new
VarBinaryType(255)});
+ byte[] expected = new byte[] {0x0A, 0x0B, 0x0C};
+ BinaryRecordData record = generator.generate(new Object[] {expected});
+ assertThat(getter.getFieldOrNull(record)).isEqualTo(expected);
+ }
+
+ @Test
+ void testCreateFieldGetterBinaryNullable() {
+ RecordData.FieldGetter getter =
+ StarRocksUtils.createFieldGetter(DataTypes.BYTES(), 0,
ZoneId.of("UTC"));
+
+ BinaryRecordDataGenerator generator =
+ new BinaryRecordDataGenerator(new DataType[]
{DataTypes.BYTES()});
+ BinaryRecordData record = generator.generate(new Object[] {null});
+ assertThat(getter.getFieldOrNull(record)).isNull();
}
}