This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 842112286 [FLINK-38886] Introduce `GenericRecordData` and Internal /
External converters in flink-cdc-common (#4218)
842112286 is described below
commit 842112286864d411cdc32572083d1b48835f67e7
Author: yuxiqian <[email protected]>
AuthorDate: Fri Jan 23 15:54:48 2026 +0800
[FLINK-38886] Introduce `GenericRecordData` and Internal / External
converters in flink-cdc-common (#4218)
---
docs/content.zh/docs/core-concept/type-mappings.md | 62 +++
docs/content/docs/core-concept/type-mappings.md | 62 +++
.../cdc/common/converter/CommonConverter.java | 449 +++++++++++++++++++++
.../common/converter/InternalClassConverter.java | 174 ++++++++
.../common/converter/InternalObjectConverter.java | 176 ++++++++
.../cdc/common/converter/JavaClassConverter.java | 174 ++++++++
.../cdc/common/converter/JavaObjectConverter.java | 172 ++++++++
.../flink/cdc/common/data/GenericArrayData.java | 5 +
.../flink/cdc/common/data/GenericRecordData.java | 239 +++++++++++
.../flink/cdc/common/utils/SchemaMergingUtils.java | 9 +-
.../converter/InternalClassConverterTest.java | 97 +++++
.../converter/InternalObjectConverterTest.java | 409 +++++++++++++++++++
.../common/converter/JavaClassConverterTest.java | 97 +++++
.../common/converter/JavaObjectConverterTest.java | 389 ++++++++++++++++++
.../common/converter/VariantConvertingTest.java | 144 +++++++
.../cdc/common/utils/SchemaMergingUtilsTest.java | 47 ++-
16 files changed, 2687 insertions(+), 18 deletions(-)
diff --git a/docs/content.zh/docs/core-concept/type-mappings.md
b/docs/content.zh/docs/core-concept/type-mappings.md
new file mode 100644
index 000000000..a58784fa2
--- /dev/null
+++ b/docs/content.zh/docs/core-concept/type-mappings.md
@@ -0,0 +1,62 @@
+---
+title: "Type Mappings"
+weight: 8
+type: docs
+aliases:
+ - /core-concept/type-mappings/
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# 类型映射
+
+对于每个 CDC 数据类型(`org.apache.flink.cdc.common.types.DataType`
的子类),我们规定了用于内部序列化和反序列化的 CDC 内部类型,以及用于类型合并、类型转换和 UDF 求值的 Java 外部类型。
+
+## 内部类型和外部类型
+
+一些基本类型对于内部类型和 Java 类可能具有相同的表示形式(例如,`DataTypes.INT()` 使用 `java.lang.Integer`
同时作为内部和外部类型)。
+其他类型可能使用不同的表示形式,例如,`DataTypes.TIMESTAMP` 在内部表示中使用
`org.apache.flink.cdc.common.data.TimestampData`,在外部操作中使用
`java.time.LocalDateTime`。
+
+如果您正在编写 YAML Pipeline 连接器,`DataChangeEvent` 应当携带内部类型
`RecordData`,并且其所有字段都是内部类型的实例。
+
+如果您正在编写 Transform UDF,则其参数和返回值类型应定义为其外部 Java 类型。
+
+## 完整类型列表
+
+| CDC 数据类型 | CDC 内部类型
| Java 外部类型 |
+|--------------------------------|------------------------------------------------------------|-----------------------------------------------------|
+| BOOLEAN | `java.lang.Boolean`
| `java.lang.Boolean` |
+| TINYINT | `java.lang.Byte`
| `java.lang.Byte` |
+| SMALLINT | `java.lang.Short`
| `java.lang.Short` |
+| INTEGER | `java.lang.Integer`
| `java.lang.Integer` |
+| BIGINT | `java.lang.Long`
| `java.lang.Long` |
+| FLOAT | `java.lang.Float`
| `java.lang.Float` |
+| DOUBLE | `java.lang.Double`
| `java.lang.Double` |
+| DECIMAL |
`org.apache.flink.cdc.common.data.DecimalData` |
`java.math.BigDecimal` |
+| DATE | `org.apache.flink.cdc.common.data.DateData`
| `java.time.LocalDate` |
+| TIME | `org.apache.flink.cdc.common.data.TimeData`
| `java.time.LocalTime` |
+| TIMESTAMP |
`org.apache.flink.cdc.common.data.TimestampData` |
`java.time.LocalDateTime` |
+| TIMESTAMP_TZ |
`org.apache.flink.cdc.common.data.ZonedTimestampData` |
`java.time.ZonedDateTime` |
+| TIMESTAMP_LTZ |
`org.apache.flink.cdc.common.data.LocalZonedTimestampData` |
`java.time.Instant` |
+| CHAR<br/>VARCHAR<br/>STRING |
`org.apache.flink.cdc.common.data.StringData` | `java.lang.String`
|
+| BINARY<br/>VARBINARY<br/>BYTES | `byte[]`
| `byte[]` |
+| ARRAY |
`org.apache.flink.cdc.common.data.ArrayData` |
`java.util.List<T>` |
+| MAP | `org.apache.flink.cdc.common.data.MapData`
| `java.util.Map<K, V>` |
+| ROW |
`org.apache.flink.cdc.common.data.RecordData` |
`java.util.List<Object>` |
+| VARIANT |
`org.apache.flink.cdc.common.types.variant.Variant` |
`org.apache.flink.cdc.common.types.variant.Variant` |
diff --git a/docs/content/docs/core-concept/type-mappings.md
b/docs/content/docs/core-concept/type-mappings.md
new file mode 100644
index 000000000..a02f9b3ba
--- /dev/null
+++ b/docs/content/docs/core-concept/type-mappings.md
@@ -0,0 +1,62 @@
+---
+title: "Type Mappings"
+weight: 8
+type: docs
+aliases:
+ - /core-concept/type-mappings/
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Type Mappings
+
+For each CDC DataType (subclasses of
`org.apache.flink.cdc.common.types.DataType`), we define CDC Internal types for
internal serialization & deserialization and Java classes (for type merging,
casting, and UDF evaluations).
+
+## Internal and External Types
+
+Primitive types may have the same representation for Internal type and Java
classes (`DataTypes.INT()` only uses `java.lang.Integer`).
+Other types use different representations, like `DataTypes.TIMESTAMP` uses
`org.apache.flink.cdc.common.data.TimestampData` for internal representation
and `java.time.LocalDateTime` for external operations.
+
+If you're writing a pipeline source / sink connector, `DataChangeEvent`
carries internal type `RecordData`, and all its fields are internal type
instances.
+
+If you're writing a UDF, its arguments and return value types should be
defined as its external Java type.
+
+## Full Types List
+
+| CDC Data Type | CDC Internal Type
| External Java Class |
+|--------------------------------|------------------------------------------------------------|-----------------------------------------------------|
+| BOOLEAN | `java.lang.Boolean`
| `java.lang.Boolean` |
+| TINYINT | `java.lang.Byte`
| `java.lang.Byte` |
+| SMALLINT | `java.lang.Short`
| `java.lang.Short` |
+| INTEGER | `java.lang.Integer`
| `java.lang.Integer` |
+| BIGINT | `java.lang.Long`
| `java.lang.Long` |
+| FLOAT | `java.lang.Float`
| `java.lang.Float` |
+| DOUBLE | `java.lang.Double`
| `java.lang.Double` |
+| DECIMAL |
`org.apache.flink.cdc.common.data.DecimalData` |
`java.math.BigDecimal` |
+| DATE | `org.apache.flink.cdc.common.data.DateData`
| `java.time.LocalDate` |
+| TIME | `org.apache.flink.cdc.common.data.TimeData`
| `java.time.LocalTime` |
+| TIMESTAMP |
`org.apache.flink.cdc.common.data.TimestampData` |
`java.time.LocalDateTime` |
+| TIMESTAMP_TZ |
`org.apache.flink.cdc.common.data.ZonedTimestampData` |
`java.time.ZonedDateTime` |
+| TIMESTAMP_LTZ |
`org.apache.flink.cdc.common.data.LocalZonedTimestampData` |
`java.time.Instant` |
+| CHAR<br/>VARCHAR<br/>STRING |
`org.apache.flink.cdc.common.data.StringData` | `java.lang.String`
|
+| BINARY<br/>VARBINARY<br/>BYTES | `byte[]`
| `byte[]` |
+| ARRAY |
`org.apache.flink.cdc.common.data.ArrayData` |
`java.util.List<T>` |
+| MAP | `org.apache.flink.cdc.common.data.MapData`
| `java.util.Map<K, V>` |
+| ROW |
`org.apache.flink.cdc.common.data.RecordData` |
`java.util.List<Object>` |
+| VARIANT |
`org.apache.flink.cdc.common.types.variant.Variant` |
`org.apache.flink.cdc.common.types.variant.Variant` |
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/CommonConverter.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/CommonConverter.java
new file mode 100644
index 000000000..feb0e4459
--- /dev/null
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/CommonConverter.java
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.converter;
+
+import org.apache.flink.cdc.common.data.ArrayData;
+import org.apache.flink.cdc.common.data.DateData;
+import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.GenericArrayData;
+import org.apache.flink.cdc.common.data.GenericMapData;
+import org.apache.flink.cdc.common.data.GenericRecordData;
+import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.MapData;
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.data.StringData;
+import org.apache.flink.cdc.common.data.TimeData;
+import org.apache.flink.cdc.common.data.TimestampData;
+import org.apache.flink.cdc.common.data.ZonedTimestampData;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.types.ArrayType;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.MapType;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.common.types.VariantType;
+import org.apache.flink.cdc.common.types.variant.Variant;
+import org.apache.flink.cdc.common.utils.Preconditions;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Some shared converters between Java objects and internal objects. None of
these functions are
+ * null-safe.
+ */
+public class CommonConverter {
+
+ // ----------------------
+ // These are shared converters used for both Internal and Java objects.
+ // ----------------------
+ static Boolean convertToBoolean(Object obj) {
+ if (obj instanceof Boolean) {
+ return (Boolean) obj;
+ }
+ throw new RuntimeException(
+ "Cannot convert " + obj + " of type " + obj.getClass() + " to
BOOLEAN.");
+ }
+
+ static Byte convertToByte(Object obj) {
+ if (obj instanceof Byte) {
+ return (Byte) obj;
+ }
+ throw new RuntimeException(
+ "Cannot convert " + obj + " of type " + obj.getClass() + " to
TINYINT.");
+ }
+
+ static Short convertToShort(Object obj) {
+ if (obj instanceof Short) {
+ return (Short) obj;
+ }
+ throw new RuntimeException(
+ "Cannot convert " + obj + " of type " + obj.getClass() + " to
SMALLINT.");
+ }
+
+ static Integer convertToInt(Object obj) {
+ if (obj instanceof Integer) {
+ return (Integer) obj;
+ }
+ throw new RuntimeException(
+ "Cannot convert " + obj + " of type " + obj.getClass() + " to
INT.");
+ }
+
+ static Long convertToLong(Object obj) {
+ if (obj instanceof Long) {
+ return (Long) obj;
+ }
+ throw new RuntimeException(
+ "Cannot convert " + obj + " of type " + obj.getClass() + " to
BIGINT.");
+ }
+
+ static Float convertToFloat(Object obj) {
+ if (obj instanceof Float) {
+ return (Float) obj;
+ }
+ throw new RuntimeException(
+ "Cannot convert " + obj + " of type " + obj.getClass() + " to
FLOAT.");
+ }
+
+ static Double convertToDouble(Object obj) {
+ if (obj instanceof Double) {
+ return (Double) obj;
+ }
+ throw new RuntimeException(
+ "Cannot convert " + obj + " of type " + obj.getClass() + " to
DOUBLE.");
+ }
+
+ static byte[] convertToBinary(Object obj) {
+ if (obj instanceof byte[]) {
+ return (byte[]) obj;
+ }
+ throw new RuntimeException(
+ "Cannot convert " + obj + " of type " + obj.getClass() + " to
BINARY.");
+ }
+
+ static Variant convertToVariant(Object obj, VariantType variantType) {
+ if (obj instanceof Variant) {
+ return (Variant) obj;
+ }
+ throw new RuntimeException(
+ "Cannot convert "
+ + obj
+ + " of type "
+ + obj.getClass()
+ + " to Variant ("
+ + variantType
+ + ").");
+ }
+
+ // ----------------------
+ // These are converters to CDC Internal objects.
+ // ----------------------
+
+ static StringData convertToStringData(Object obj) {
+ if (obj instanceof StringData) {
+ return (StringData) obj;
+ }
+ if (obj instanceof String) {
+ return BinaryStringData.fromString((String) obj);
+ }
+ throw new RuntimeException(
+ "Cannot convert " + obj + " of type " + obj.getClass() + " to
STRING DATA.");
+ }
+
+ static DecimalData convertToDecimalData(Object obj) {
+ if (obj instanceof DecimalData) {
+ return (DecimalData) obj;
+ }
+ if (obj instanceof BigDecimal) {
+ BigDecimal bd = (BigDecimal) obj;
+ return DecimalData.fromBigDecimal(bd, bd.precision(), bd.scale());
+ }
+ throw new RuntimeException(
+ "Cannot convert " + obj + " of type " + obj.getClass() + " to
DECIMAL DATA.");
+ }
+
+ static DateData convertToDateData(Object obj) {
+ if (obj instanceof DateData) {
+ return (DateData) obj;
+ }
+ if (obj instanceof LocalDate) {
+ return DateData.fromLocalDate((LocalDate) obj);
+ }
+ throw new RuntimeException(
+ "Cannot convert " + obj + " of type " + obj.getClass() + " to
DATE DATA.");
+ }
+
+ static TimeData convertToTimeData(Object obj) {
+ if (obj instanceof TimeData) {
+ return (TimeData) obj;
+ }
+ if (obj instanceof LocalTime) {
+ return TimeData.fromLocalTime((LocalTime) obj);
+ }
+ throw new RuntimeException(
+ "Cannot convert " + obj + " of type " + obj.getClass() + " to
TIME DATA.");
+ }
+
+ static TimestampData convertToTimestampData(Object obj) {
+ if (obj instanceof TimestampData) {
+ return (TimestampData) obj;
+ }
+ if (obj instanceof LocalDateTime) {
+ return TimestampData.fromLocalDateTime((LocalDateTime) obj);
+ }
+ throw new RuntimeException(
+ "Cannot convert " + obj + " of type " + obj.getClass() + " to
TIMESTAMP DATA.");
+ }
+
+ static ZonedTimestampData convertToZonedTimestampData(Object obj) {
+ if (obj instanceof ZonedTimestampData) {
+ return (ZonedTimestampData) obj;
+ }
+ if (obj instanceof ZonedDateTime) {
+ return ZonedTimestampData.fromZonedDateTime((ZonedDateTime) obj);
+ }
+ throw new RuntimeException(
+ "Cannot convert " + obj + " of type " + obj.getClass() + " to
TIMESTAMP_TZ DATA.");
+ }
+
+ static LocalZonedTimestampData convertToLocalZonedTimestampData(Object
obj) {
+ if (obj instanceof LocalZonedTimestampData) {
+ return (LocalZonedTimestampData) obj;
+ }
+ if (obj instanceof Instant) {
+ return LocalZonedTimestampData.fromInstant((Instant) obj);
+ }
+ throw new RuntimeException(
+ "Cannot convert " + obj + " of type " + obj.getClass() + " to
TIMESTAMP_LTZ DATA.");
+ }
+
+ static ArrayData convertToArrayData(Object obj, ArrayType arrayType) {
+ if (obj instanceof ArrayData) {
+ return (ArrayData) obj;
+ }
+ if (obj instanceof List) {
+ DataType elementType = arrayType.getElementType();
+ List<?> objects = (List<?>) obj;
+ List<Object> convertedObjects = new ArrayList<>(objects.size());
+ for (Object object : objects) {
+ convertedObjects.add(
+ InternalObjectConverter.convertToInternal(object,
elementType));
+ }
+ return new GenericArrayData(convertedObjects.toArray());
+ }
+ throw new RuntimeException(
+ "Cannot convert " + obj + " of type " + obj.getClass() + " to
ARRAY DATA.");
+ }
+
+ static MapData convertToMapData(Object obj, MapType mapType) {
+ if (obj instanceof MapData) {
+ return (MapData) obj;
+ }
+ if (obj instanceof Map) {
+ DataType keyType = mapType.getKeyType();
+ DataType valueType = mapType.getValueType();
+ Map<?, ?> map = (Map<?, ?>) obj;
+ Map<Object, Object> convertedMap = new HashMap<>(map.size());
+ for (Map.Entry<?, ?> entry : map.entrySet()) {
+ Object key =
InternalObjectConverter.convertToInternal(entry.getKey(), keyType);
+ Object value =
+
InternalObjectConverter.convertToInternal(entry.getValue(), valueType);
+ convertedMap.put(key, value);
+ }
+ return new GenericMapData(convertedMap);
+ }
+ throw new RuntimeException(
+ "Cannot convert " + obj + " of type " + obj.getClass() + " to
MAP DATA.");
+ }
+
+ static RecordData convertToRowData(Object obj, RowType rowType) {
+ if (obj instanceof RecordData) {
+ return (RecordData) obj;
+ }
+ if (obj instanceof List) {
+ List<DataType> dataTypes = rowType.getFieldTypes();
+ List<?> objects = (List<?>) obj;
+ List<Object> convertedObjects = new ArrayList<>(objects.size());
+ Preconditions.checkArgument(
+ objects.size() == dataTypes.size(),
+ "Cannot convert "
+ + obj
+ + " of type "
+ + obj.getClass()
+ + " with different arity.");
+ for (int i = 0; i < objects.size(); i++) {
+ convertedObjects.add(
+ InternalObjectConverter.convertToInternal(
+ objects.get(i), dataTypes.get(i)));
+ }
+ return GenericRecordData.of(convertedObjects.toArray());
+ }
+ throw new RuntimeException(
+ "Cannot convert " + obj + " of type " + obj.getClass() + " to
ROW DATA.");
+ }
+
+ // ----------------------
+ // These are converters to Java objects.
+ // ----------------------
+
+ static String convertToString(Object obj) {
+ if (obj instanceof String) {
+ return (String) obj;
+ }
+ if (obj instanceof StringData) {
+ return obj.toString();
+ }
+ throw new RuntimeException(
+ "Cannot convert " + obj + " of type " + obj.getClass() + " to
STRING.");
+ }
+
+ static BigDecimal convertToBigDecimal(Object obj) {
+ if (obj instanceof BigDecimal) {
+ return (BigDecimal) obj;
+ }
+ if (obj instanceof DecimalData) {
+ return ((DecimalData) obj).toBigDecimal();
+ }
+ throw new RuntimeException(
+ "Cannot convert " + obj + " of type " + obj.getClass() + " to
DECIMAL.");
+ }
+
+ static LocalDate convertToLocalDate(Object obj) {
+ if (obj instanceof LocalDate) {
+ return (LocalDate) obj;
+ }
+ if (obj instanceof DateData) {
+ return ((DateData) obj).toLocalDate();
+ }
+ throw new RuntimeException(
+ "Cannot convert " + obj + " of type " + obj.getClass() + " to
LOCAL DATE.");
+ }
+
+ static LocalTime convertToLocalTime(Object obj) {
+ if (obj instanceof LocalTime) {
+ return (LocalTime) obj;
+ }
+ if (obj instanceof TimeData) {
+ return ((TimeData) obj).toLocalTime();
+ }
+ throw new RuntimeException(
+ "Cannot convert " + obj + " of type " + obj.getClass() + " to
LOCAL TIME.");
+ }
+
+ static LocalDateTime convertToLocalDateTime(Object obj) {
+ if (obj instanceof LocalDateTime) {
+ return (LocalDateTime) obj;
+ }
+ if (obj instanceof TimestampData) {
+ return ((TimestampData) obj).toLocalDateTime();
+ }
+ throw new RuntimeException(
+ "Cannot convert " + obj + " of type " + obj.getClass() + " to
LOCAL DATETIME.");
+ }
+
+ static ZonedDateTime convertToZonedDateTime(Object obj) {
+ if (obj instanceof ZonedDateTime) {
+ return (ZonedDateTime) obj;
+ }
+ if (obj instanceof ZonedTimestampData) {
+ return ((ZonedTimestampData) obj).getZonedDateTime();
+ }
+ throw new RuntimeException(
+ "Cannot convert " + obj + " of type " + obj.getClass() + " to
ZONED DATETIME.");
+ }
+
+ static Instant convertToInstant(Object obj) {
+ if (obj instanceof Instant) {
+ return (Instant) obj;
+ }
+ if (obj instanceof LocalZonedTimestampData) {
+ return ((LocalZonedTimestampData) obj).toInstant();
+ }
+ throw new RuntimeException(
+ "Cannot convert " + obj + " of type " + obj.getClass() + " to
INSTANT.");
+ }
+
+ static List<?> convertToList(Object obj, ArrayType arrayType) {
+ if (obj instanceof List) {
+ return (List<?>) obj;
+ }
+ if (obj instanceof ArrayData) {
+ ArrayData arrayData = (ArrayData) obj;
+ DataType elementType = arrayType.getElementType();
+ ArrayData.ElementGetter elementGetter =
ArrayData.createElementGetter(elementType);
+ List<Object> convertedObjects = new ArrayList<>(arrayData.size());
+ for (int i = 0; i < arrayData.size(); i++) {
+ convertedObjects.add(
+ JavaObjectConverter.convertToJava(
+ elementGetter.getElementOrNull(arrayData, i),
elementType));
+ }
+ return convertedObjects;
+ }
+ throw new RuntimeException(
+ "Cannot convert "
+ + obj
+ + " of type "
+ + obj.getClass()
+ + " to LIST ("
+ + arrayType
+ + ").");
+ }
+
+ static Map<?, ?> convertToMap(Object obj, MapType mapType) {
+ if (obj instanceof Map) {
+ return (Map<?, ?>) obj;
+ }
+ if (obj instanceof MapData) {
+ MapData mapData = (MapData) obj;
+ DataType keyType = mapType.getKeyType();
+ DataType valueType = mapType.getValueType();
+ ArrayData keyArray = mapData.keyArray();
+ ArrayData valueArray = mapData.valueArray();
+ List<?> keyObjects = convertToList(keyArray, new
ArrayType(keyType));
+ List<?> valueObjects = convertToList(valueArray, new
ArrayType(valueType));
+ Map<Object, Object> convertedMap = new HashMap<>(mapData.size());
+ for (int i = 0; i < mapData.size(); i++) {
+ convertedMap.put(keyObjects.get(i), valueObjects.get(i));
+ }
+ return convertedMap;
+ }
+ throw new RuntimeException(
+ "Cannot convert "
+ + obj
+ + " of type "
+ + obj.getClass()
+ + " to MAP ("
+ + mapType
+ + ").");
+ }
+
+ static List<?> convertToRow(Object obj, RowType rowType) {
+ if (obj instanceof List) {
+ return (List<?>) obj;
+ }
+ if (obj instanceof RecordData) {
+ RecordData recordData = (RecordData) obj;
+ List<DataType> dataTypes = rowType.getFieldTypes();
+ List<RecordData.FieldGetter> fieldGetters =
+ SchemaUtils.createFieldGetters(dataTypes.toArray(new
DataType[0]));
+ List<Object> objects = new ArrayList<>(recordData.getArity());
+ for (int i = 0; i < fieldGetters.size(); i++) {
+ objects.add(
+ JavaObjectConverter.convertToJava(
+
fieldGetters.get(i).getFieldOrNull(recordData), dataTypes.get(i)));
+ }
+ return objects;
+ }
+ throw new RuntimeException(
+ "Cannot convert "
+ + obj
+ + " of type "
+ + obj.getClass()
+ + " to ROW ("
+ + rowType
+ + ").");
+ }
+}
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/InternalClassConverter.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/InternalClassConverter.java
new file mode 100644
index 000000000..fd860f5c9
--- /dev/null
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/InternalClassConverter.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.converter;
+
+import org.apache.flink.cdc.common.data.ArrayData;
+import org.apache.flink.cdc.common.data.DateData;
+import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.MapData;
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.data.StringData;
+import org.apache.flink.cdc.common.data.TimeData;
+import org.apache.flink.cdc.common.data.TimestampData;
+import org.apache.flink.cdc.common.data.ZonedTimestampData;
+import org.apache.flink.cdc.common.types.ArrayType;
+import org.apache.flink.cdc.common.types.BigIntType;
+import org.apache.flink.cdc.common.types.BinaryType;
+import org.apache.flink.cdc.common.types.BooleanType;
+import org.apache.flink.cdc.common.types.CharType;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypeVisitor;
+import org.apache.flink.cdc.common.types.DateType;
+import org.apache.flink.cdc.common.types.DecimalType;
+import org.apache.flink.cdc.common.types.DoubleType;
+import org.apache.flink.cdc.common.types.FloatType;
+import org.apache.flink.cdc.common.types.IntType;
+import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
+import org.apache.flink.cdc.common.types.MapType;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.common.types.SmallIntType;
+import org.apache.flink.cdc.common.types.TimeType;
+import org.apache.flink.cdc.common.types.TimestampType;
+import org.apache.flink.cdc.common.types.TinyIntType;
+import org.apache.flink.cdc.common.types.VarBinaryType;
+import org.apache.flink.cdc.common.types.VarCharType;
+import org.apache.flink.cdc.common.types.VariantType;
+import org.apache.flink.cdc.common.types.ZonedTimestampType;
+import org.apache.flink.cdc.common.types.variant.Variant;
+
+/**
+ * Converts a {@link org.apache.flink.cdc.common.types.DataType} to its CDC
Internal representation.
+ */
+public class InternalClassConverter implements DataTypeVisitor<Class<?>> {
+
+ public static final InternalClassConverter INSTANCE = new
InternalClassConverter();
+
+ public static Class<?> toInternalClass(DataType dataType) {
+ return dataType.accept(INSTANCE);
+ }
+
+ private InternalClassConverter() {
+ // No instantiation.
+ }
+
+ @Override
+ public Class<?> visit(CharType charType) {
+ return StringData.class;
+ }
+
+ @Override
+ public Class<?> visit(VarCharType varCharType) {
+ return StringData.class;
+ }
+
+ @Override
+ public Class<?> visit(BooleanType booleanType) {
+ return Boolean.class;
+ }
+
+ @Override
+ public Class<?> visit(BinaryType binaryType) {
+ return byte[].class;
+ }
+
+ @Override
+ public Class<?> visit(VarBinaryType varBinaryType) {
+ return byte[].class;
+ }
+
+ @Override
+ public Class<?> visit(DecimalType decimalType) {
+ return DecimalData.class;
+ }
+
+ @Override
+ public Class<?> visit(TinyIntType tinyIntType) {
+ return Byte.class;
+ }
+
+ @Override
+ public Class<?> visit(SmallIntType smallIntType) {
+ return Short.class;
+ }
+
+ @Override
+ public Class<?> visit(IntType intType) {
+ return Integer.class;
+ }
+
+ @Override
+ public Class<?> visit(BigIntType bigIntType) {
+ return Long.class;
+ }
+
+ @Override
+ public Class<?> visit(FloatType floatType) {
+ return Float.class;
+ }
+
+ @Override
+ public Class<?> visit(DoubleType doubleType) {
+ return Double.class;
+ }
+
+ @Override
+ public Class<?> visit(DateType dateType) {
+ return DateData.class;
+ }
+
+ @Override
+ public Class<?> visit(TimeType timeType) {
+ return TimeData.class;
+ }
+
+ @Override
+ public Class<?> visit(TimestampType timestampType) {
+ return TimestampData.class;
+ }
+
+ @Override
+ public Class<?> visit(ZonedTimestampType zonedTimestampType) {
+ return ZonedTimestampData.class;
+ }
+
+ @Override
+ public Class<?> visit(LocalZonedTimestampType localZonedTimestampType) {
+ return LocalZonedTimestampData.class;
+ }
+
+ @Override
+ public Class<?> visit(ArrayType arrayType) {
+ return ArrayData.class;
+ }
+
+ @Override
+ public Class<?> visit(MapType mapType) {
+ return MapData.class;
+ }
+
+ @Override
+ public Class<?> visit(RowType rowType) {
+ return RecordData.class;
+ }
+
+ @Override
+ public Class<?> visit(VariantType variantType) {
+ return Variant.class;
+ }
+}
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/InternalObjectConverter.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/InternalObjectConverter.java
new file mode 100644
index 000000000..7f1cc97bd
--- /dev/null
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/InternalObjectConverter.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.converter;
+
+import org.apache.flink.cdc.common.data.ArrayData;
+import org.apache.flink.cdc.common.data.DateData;
+import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.MapData;
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.data.StringData;
+import org.apache.flink.cdc.common.data.TimeData;
+import org.apache.flink.cdc.common.data.TimestampData;
+import org.apache.flink.cdc.common.data.ZonedTimestampData;
+import org.apache.flink.cdc.common.types.ArrayType;
+import org.apache.flink.cdc.common.types.BigIntType;
+import org.apache.flink.cdc.common.types.BinaryType;
+import org.apache.flink.cdc.common.types.BooleanType;
+import org.apache.flink.cdc.common.types.CharType;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypeVisitor;
+import org.apache.flink.cdc.common.types.DateType;
+import org.apache.flink.cdc.common.types.DecimalType;
+import org.apache.flink.cdc.common.types.DoubleType;
+import org.apache.flink.cdc.common.types.FloatType;
+import org.apache.flink.cdc.common.types.IntType;
+import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
+import org.apache.flink.cdc.common.types.MapType;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.common.types.SmallIntType;
+import org.apache.flink.cdc.common.types.TimeType;
+import org.apache.flink.cdc.common.types.TimestampType;
+import org.apache.flink.cdc.common.types.TinyIntType;
+import org.apache.flink.cdc.common.types.VarBinaryType;
+import org.apache.flink.cdc.common.types.VarCharType;
+import org.apache.flink.cdc.common.types.VariantType;
+import org.apache.flink.cdc.common.types.ZonedTimestampType;
+
+import java.util.function.Function;
+
+/** Common converters from Java objects to CDC Internal objects. */
+public class InternalObjectConverter {
+
+ private static final ToInternalObjectConverter converter = new
ToInternalObjectConverter();
+
+ static class ToInternalObjectConverter implements
DataTypeVisitor<Function<Object, ?>> {
+
+ @Override
+ public Function<Object, StringData> visit(CharType charType) {
+ return CommonConverter::convertToStringData;
+ }
+
+ @Override
+ public Function<Object, StringData> visit(VarCharType varCharType) {
+ return CommonConverter::convertToStringData;
+ }
+
+ @Override
+ public Function<Object, Boolean> visit(BooleanType booleanType) {
+ return CommonConverter::convertToBoolean;
+ }
+
+ @Override
+ public Function<Object, byte[]> visit(BinaryType binaryType) {
+ return CommonConverter::convertToBinary;
+ }
+
+ @Override
+ public Function<Object, byte[]> visit(VarBinaryType varBinaryType) {
+ return CommonConverter::convertToBinary;
+ }
+
+ @Override
+ public Function<Object, DecimalData> visit(DecimalType decimalType) {
+ return CommonConverter::convertToDecimalData;
+ }
+
+ @Override
+ public Function<Object, Byte> visit(TinyIntType tinyIntType) {
+ return CommonConverter::convertToByte;
+ }
+
+ @Override
+ public Function<Object, Short> visit(SmallIntType smallIntType) {
+ return CommonConverter::convertToShort;
+ }
+
+ @Override
+ public Function<Object, Integer> visit(IntType intType) {
+ return CommonConverter::convertToInt;
+ }
+
+ @Override
+ public Function<Object, Long> visit(BigIntType bigIntType) {
+ return CommonConverter::convertToLong;
+ }
+
+ @Override
+ public Function<Object, Float> visit(FloatType floatType) {
+ return CommonConverter::convertToFloat;
+ }
+
+ @Override
+ public Function<Object, Double> visit(DoubleType doubleType) {
+ return CommonConverter::convertToDouble;
+ }
+
+ @Override
+ public Function<Object, DateData> visit(DateType dateType) {
+ return CommonConverter::convertToDateData;
+ }
+
+ @Override
+ public Function<Object, TimeData> visit(TimeType timeType) {
+ return CommonConverter::convertToTimeData;
+ }
+
+ @Override
+ public Function<Object, TimestampData> visit(TimestampType
timestampType) {
+ return CommonConverter::convertToTimestampData;
+ }
+
+ @Override
+ public Function<Object, ZonedTimestampData> visit(ZonedTimestampType
zonedTimestampType) {
+ return CommonConverter::convertToZonedTimestampData;
+ }
+
+ @Override
+ public Function<Object, LocalZonedTimestampData> visit(
+ LocalZonedTimestampType localZonedTimestampType) {
+ return CommonConverter::convertToLocalZonedTimestampData;
+ }
+
+ @Override
+ public Function<Object, ArrayData> visit(ArrayType arrayType) {
+ return o -> CommonConverter.convertToArrayData(o, arrayType);
+ }
+
+ @Override
+ public Function<Object, MapData> visit(MapType mapType) {
+ return o -> CommonConverter.convertToMapData(o, mapType);
+ }
+
+ @Override
+ public Function<Object, RecordData> visit(RowType rowType) {
+ return o -> CommonConverter.convertToRowData(o, rowType);
+ }
+
+ @Override
+ public Function<Object, ?> visit(VariantType variantType) {
+ return o -> CommonConverter.convertToVariant(o, variantType);
+ }
+ }
+
+ public static Object convertToInternal(Object obj, DataType dataType) {
+ if (obj == null) {
+ return null;
+ }
+ return dataType.accept(converter).apply(obj);
+ }
+}
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/JavaClassConverter.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/JavaClassConverter.java
new file mode 100644
index 000000000..2a1ffbfea
--- /dev/null
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/JavaClassConverter.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.converter;
+
+import org.apache.flink.cdc.common.types.ArrayType;
+import org.apache.flink.cdc.common.types.BigIntType;
+import org.apache.flink.cdc.common.types.BinaryType;
+import org.apache.flink.cdc.common.types.BooleanType;
+import org.apache.flink.cdc.common.types.CharType;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypeVisitor;
+import org.apache.flink.cdc.common.types.DateType;
+import org.apache.flink.cdc.common.types.DecimalType;
+import org.apache.flink.cdc.common.types.DoubleType;
+import org.apache.flink.cdc.common.types.FloatType;
+import org.apache.flink.cdc.common.types.IntType;
+import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
+import org.apache.flink.cdc.common.types.MapType;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.common.types.SmallIntType;
+import org.apache.flink.cdc.common.types.TimeType;
+import org.apache.flink.cdc.common.types.TimestampType;
+import org.apache.flink.cdc.common.types.TinyIntType;
+import org.apache.flink.cdc.common.types.VarBinaryType;
+import org.apache.flink.cdc.common.types.VarCharType;
+import org.apache.flink.cdc.common.types.VariantType;
+import org.apache.flink.cdc.common.types.ZonedTimestampType;
+import org.apache.flink.cdc.common.types.variant.Variant;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZonedDateTime;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Converts a {@link org.apache.flink.cdc.common.types.DataType} to its Java
representation that
+ * could be used in UDF manipulation.
+ */
+public class JavaClassConverter implements DataTypeVisitor<Class<?>> {
+
+ public static final JavaClassConverter INSTANCE = new JavaClassConverter();
+
+ public static Class<?> toJavaClass(DataType dataType) {
+ return dataType.accept(INSTANCE);
+ }
+
+ private JavaClassConverter() {
+ // No instantiation.
+ }
+
+ @Override
+ public Class<?> visit(CharType charType) {
+ return String.class;
+ }
+
+ @Override
+ public Class<?> visit(VarCharType varCharType) {
+ return String.class;
+ }
+
+ @Override
+ public Class<?> visit(BooleanType booleanType) {
+ return Boolean.class;
+ }
+
+ @Override
+ public Class<?> visit(BinaryType binaryType) {
+ return byte[].class;
+ }
+
+ @Override
+ public Class<?> visit(VarBinaryType varBinaryType) {
+ return byte[].class;
+ }
+
+ @Override
+ public Class<?> visit(DecimalType decimalType) {
+ return BigDecimal.class;
+ }
+
+ @Override
+ public Class<?> visit(TinyIntType tinyIntType) {
+ return Byte.class;
+ }
+
+ @Override
+ public Class<?> visit(SmallIntType smallIntType) {
+ return Short.class;
+ }
+
+ @Override
+ public Class<?> visit(IntType intType) {
+ return Integer.class;
+ }
+
+ @Override
+ public Class<?> visit(BigIntType bigIntType) {
+ return Long.class;
+ }
+
+ @Override
+ public Class<?> visit(FloatType floatType) {
+ return Float.class;
+ }
+
+ @Override
+ public Class<?> visit(DoubleType doubleType) {
+ return Double.class;
+ }
+
+ @Override
+ public Class<?> visit(DateType dateType) {
+ return LocalDate.class;
+ }
+
+ @Override
+ public Class<?> visit(TimeType timeType) {
+ return LocalTime.class;
+ }
+
+ @Override
+ public Class<?> visit(TimestampType timestampType) {
+ return LocalDateTime.class;
+ }
+
+ @Override
+ public Class<?> visit(ZonedTimestampType zonedTimestampType) {
+ return ZonedDateTime.class;
+ }
+
+ @Override
+ public Class<?> visit(LocalZonedTimestampType localZonedTimestampType) {
+ return Instant.class;
+ }
+
+ @Override
+ public Class<?> visit(ArrayType arrayType) {
+ return List.class;
+ }
+
+ @Override
+ public Class<?> visit(MapType mapType) {
+ return Map.class;
+ }
+
+ @Override
+ public Class<?> visit(RowType rowType) {
+ return List.class;
+ }
+
+ @Override
+ public Class<?> visit(VariantType variantType) {
+ return Variant.class;
+ }
+}
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/JavaObjectConverter.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/JavaObjectConverter.java
new file mode 100644
index 000000000..ddb848a16
--- /dev/null
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/JavaObjectConverter.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.converter;
+
+import org.apache.flink.cdc.common.types.ArrayType;
+import org.apache.flink.cdc.common.types.BigIntType;
+import org.apache.flink.cdc.common.types.BinaryType;
+import org.apache.flink.cdc.common.types.BooleanType;
+import org.apache.flink.cdc.common.types.CharType;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypeVisitor;
+import org.apache.flink.cdc.common.types.DateType;
+import org.apache.flink.cdc.common.types.DecimalType;
+import org.apache.flink.cdc.common.types.DoubleType;
+import org.apache.flink.cdc.common.types.FloatType;
+import org.apache.flink.cdc.common.types.IntType;
+import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
+import org.apache.flink.cdc.common.types.MapType;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.common.types.SmallIntType;
+import org.apache.flink.cdc.common.types.TimeType;
+import org.apache.flink.cdc.common.types.TimestampType;
+import org.apache.flink.cdc.common.types.TinyIntType;
+import org.apache.flink.cdc.common.types.VarBinaryType;
+import org.apache.flink.cdc.common.types.VarCharType;
+import org.apache.flink.cdc.common.types.VariantType;
+import org.apache.flink.cdc.common.types.ZonedTimestampType;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZonedDateTime;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/** Common converters from CDC Internal type to Java type. */
+public class JavaObjectConverter {
+
+ private static final ToJavaObjectConverter converter = new
ToJavaObjectConverter();
+
+ static class ToJavaObjectConverter implements
DataTypeVisitor<Function<Object, ?>> {
+ @Override
+ public Function<Object, String> visit(CharType charType) {
+ return CommonConverter::convertToString;
+ }
+
+ @Override
+ public Function<Object, String> visit(VarCharType varCharType) {
+ return CommonConverter::convertToString;
+ }
+
+ @Override
+ public Function<Object, Boolean> visit(BooleanType booleanType) {
+ return CommonConverter::convertToBoolean;
+ }
+
+ @Override
+ public Function<Object, byte[]> visit(BinaryType binaryType) {
+ return CommonConverter::convertToBinary;
+ }
+
+ @Override
+ public Function<Object, byte[]> visit(VarBinaryType varBinaryType) {
+ return CommonConverter::convertToBinary;
+ }
+
+ @Override
+ public Function<Object, BigDecimal> visit(DecimalType decimalType) {
+ return CommonConverter::convertToBigDecimal;
+ }
+
+ @Override
+ public Function<Object, Byte> visit(TinyIntType tinyIntType) {
+ return CommonConverter::convertToByte;
+ }
+
+ @Override
+ public Function<Object, Short> visit(SmallIntType smallIntType) {
+ return CommonConverter::convertToShort;
+ }
+
+ @Override
+ public Function<Object, Integer> visit(IntType intType) {
+ return CommonConverter::convertToInt;
+ }
+
+ @Override
+ public Function<Object, Long> visit(BigIntType bigIntType) {
+ return CommonConverter::convertToLong;
+ }
+
+ @Override
+ public Function<Object, Float> visit(FloatType floatType) {
+ return CommonConverter::convertToFloat;
+ }
+
+ @Override
+ public Function<Object, Double> visit(DoubleType doubleType) {
+ return CommonConverter::convertToDouble;
+ }
+
+ @Override
+ public Function<Object, LocalDate> visit(DateType dateType) {
+ return CommonConverter::convertToLocalDate;
+ }
+
+ @Override
+ public Function<Object, LocalTime> visit(TimeType timeType) {
+ return CommonConverter::convertToLocalTime;
+ }
+
+ @Override
+ public Function<Object, LocalDateTime> visit(TimestampType
timestampType) {
+ return CommonConverter::convertToLocalDateTime;
+ }
+
+ @Override
+ public Function<Object, ZonedDateTime> visit(ZonedTimestampType
zonedTimestampType) {
+ return CommonConverter::convertToZonedDateTime;
+ }
+
+ @Override
+ public Function<Object, Instant> visit(LocalZonedTimestampType
localZonedTimestampType) {
+ return CommonConverter::convertToInstant;
+ }
+
+ @Override
+ public Function<Object, List<?>> visit(ArrayType arrayType) {
+ return o -> CommonConverter.convertToList(o, arrayType);
+ }
+
+ @Override
+ public Function<Object, Map<?, ?>> visit(MapType mapType) {
+ return o -> CommonConverter.convertToMap(o, mapType);
+ }
+
+ @Override
+ public Function<Object, List<?>> visit(RowType rowType) {
+ return o -> CommonConverter.convertToRow(o, rowType);
+ }
+
+ @Override
+ public Function<Object, ?> visit(VariantType variantType) {
+ return o -> CommonConverter.convertToVariant(o, variantType);
+ }
+ }
+
+ public static Object convertToJava(Object obj, DataType dataType) {
+ if (obj == null) {
+ return null;
+ }
+ return dataType.accept(converter).apply(obj);
+ }
+}
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/GenericArrayData.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/GenericArrayData.java
index f87905c43..22a2b9336 100644
---
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/GenericArrayData.java
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/GenericArrayData.java
@@ -341,4 +341,9 @@ public final class GenericArrayData implements ArrayData {
checkNoNull();
return ArrayUtils.toPrimitive((Double[]) array);
}
+
+ @Override
+ public String toString() {
+ return Arrays.asList(toObjectArray()).toString();
+ }
}
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/GenericRecordData.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/GenericRecordData.java
new file mode 100644
index 000000000..87540674f
--- /dev/null
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/GenericRecordData.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.data;
+
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.types.variant.Variant;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * An internal data structure representing data of {@link RecordData}.
+ *
+ * <p>{@link GenericRecordData} is a generic implementation of {@link
RecordData} which is backed by
+ * an array of Java {@link Object}. A {@link GenericRecordData} can have an
arbitrary number of
+ * fields of different types. The fields in a row can be accessed by position
(0-based) using either
+ * the generic {@link #getField(int)} or type-specific getters (such as {@link
#getInt(int)}). A
+ * field can be updated by the generic {@link #setField(int, Object)}.
+ *
+ * <p>Note: All fields of this data structure must be internal data
structures. See {@link
+ * RecordData} for more information about internal data structures.
+ *
+ * <p>The fields in {@link GenericRecordData} can be null for representing
nullability.
+ */
+@PublicEvolving
+public final class GenericRecordData implements RecordData {
+
+ /** The array to store the actual internal format values. */
+ private final Object[] fields;
+
+ /**
+ * Creates an instance of {@link GenericRecordData} with given number of
fields.
+ *
+ * <p>Initially, all fields are set to null.
+ *
+ * <p>Note: All fields of the row must be internal data structures.
+ *
+ * @param arity number of fields
+ */
+ public GenericRecordData(int arity) {
+ this.fields = new Object[arity];
+ }
+
+ /**
+ * Sets the field value at the given position.
+ *
+ * <p>Note: The given field value must be an internal data structures.
Otherwise the {@link
+ * GenericRecordData} is corrupted and may throw exception when
processing. See {@link
+ * RecordData} for more information about internal data structures.
+ *
+ * <p>The field value can be null for representing nullability.
+ */
+ public void setField(int pos, Object value) {
+ this.fields[pos] = value;
+ }
+
+ /**
+ * Returns the field value at the given position.
+ *
+ * <p>Note: The returned value is in internal data structure. See {@link
RecordData} for more
+ * information about internal data structures.
+ *
+ * <p>The returned field value can be null for representing nullability.
+ */
+ public Object getField(int pos) {
+ return this.fields[pos];
+ }
+
+ @Override
+ public int getArity() {
+ return fields.length;
+ }
+
+ @Override
+ public boolean isNullAt(int pos) {
+ return this.fields[pos] == null;
+ }
+
+ @Override
+ public boolean getBoolean(int pos) {
+ return (boolean) this.fields[pos];
+ }
+
+ @Override
+ public byte getByte(int pos) {
+ return (byte) this.fields[pos];
+ }
+
+ @Override
+ public short getShort(int pos) {
+ return (short) this.fields[pos];
+ }
+
+ @Override
+ public int getInt(int pos) {
+ return (int) this.fields[pos];
+ }
+
+ @Override
+ public long getLong(int pos) {
+ return (long) this.fields[pos];
+ }
+
+ @Override
+ public float getFloat(int pos) {
+ return (float) this.fields[pos];
+ }
+
+ @Override
+ public double getDouble(int pos) {
+ return (double) this.fields[pos];
+ }
+
+ @Override
+ public StringData getString(int pos) {
+ return (StringData) this.fields[pos];
+ }
+
+ @Override
+ public DecimalData getDecimal(int pos, int precision, int scale) {
+ return (DecimalData) this.fields[pos];
+ }
+
+ @Override
+ public TimestampData getTimestamp(int pos, int precision) {
+ return (TimestampData) this.fields[pos];
+ }
+
+ @Override
+ public ZonedTimestampData getZonedTimestamp(int pos, int precision) {
+ return (ZonedTimestampData) this.fields[pos];
+ }
+
+ @Override
+ public LocalZonedTimestampData getLocalZonedTimestampData(int pos, int
precision) {
+ return (LocalZonedTimestampData) this.fields[pos];
+ }
+
+ @Override
+ public byte[] getBinary(int pos) {
+ return (byte[]) this.fields[pos];
+ }
+
+ @Override
+ public ArrayData getArray(int pos) {
+ return (ArrayData) this.fields[pos];
+ }
+
+ @Override
+ public MapData getMap(int pos) {
+ return (MapData) this.fields[pos];
+ }
+
+ @Override
+ public RecordData getRow(int pos, int numFields) {
+ return (RecordData) this.fields[pos];
+ }
+
+ @Override
+ public DateData getDate(int pos) {
+ return (DateData) this.fields[pos];
+ }
+
+ @Override
+ public TimeData getTime(int pos) {
+ return (TimeData) this.fields[pos];
+ }
+
+ @Override
+ public Variant getVariant(int pos) {
+ return (Variant) this.fields[pos];
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof GenericRecordData)) {
+ return false;
+ }
+ GenericRecordData that = (GenericRecordData) o;
+ return Arrays.deepEquals(fields, that.fields);
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.deepHashCode(fields);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("(");
+ for (int i = 0; i < fields.length; i++) {
+ if (i != 0) {
+ sb.append(",");
+ }
+ sb.append(Objects.toString(fields[i]));
+ }
+ sb.append(")");
+ return sb.toString();
+ }
+
+ //
----------------------------------------------------------------------------------------
+ // Utilities
+ //
----------------------------------------------------------------------------------------
+
+ /**
+ * Creates an instance of {@link GenericRecordData} with given field
values.
+ *
+ * <p>Note: All fields of the row must be internal data structures.
+ */
+ public static GenericRecordData of(Object... values) {
+ GenericRecordData row = new GenericRecordData(values.length);
+
+ for (int i = 0; i < values.length; ++i) {
+ row.setField(i, values[i]);
+ }
+
+ return row;
+ }
+}
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java
index eaea127cf..0d95f050b 100644
---
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java
@@ -59,7 +59,9 @@ import org.apache.flink.cdc.common.types.TimestampType;
import org.apache.flink.cdc.common.types.TinyIntType;
import org.apache.flink.cdc.common.types.VarBinaryType;
import org.apache.flink.cdc.common.types.VarCharType;
+import org.apache.flink.cdc.common.types.VariantType;
import org.apache.flink.cdc.common.types.ZonedTimestampType;
+import org.apache.flink.cdc.common.types.variant.Variant;
import
org.apache.flink.shaded.guava31.com.google.common.collect.ArrayListMultimap;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
@@ -613,7 +615,7 @@ public class SchemaMergingUtils {
}
@VisibleForTesting
- static Object coerceObject(
+ public static Object coerceObject(
String timezone,
Object originalField,
DataType originalType,
@@ -733,6 +735,10 @@ public class SchemaMergingUtils {
return BinaryStringData.fromString(hexlify((byte[])
originalField));
}
+ if (originalField instanceof Variant) {
+ return BinaryStringData.fromString(((Variant)
originalField).toJson());
+ }
+
return BinaryStringData.fromString(originalField.toString());
}
@@ -1046,6 +1052,7 @@ public class SchemaMergingUtils {
mergingTree.put(RowType.class, ImmutableList.of(stringType));
mergingTree.put(ArrayType.class, ImmutableList.of(stringType));
mergingTree.put(MapType.class, ImmutableList.of(stringType));
+ mergingTree.put(VariantType.class, ImmutableList.of(stringType));
return mergingTree;
}
}
diff --git
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/InternalClassConverterTest.java
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/InternalClassConverterTest.java
new file mode 100644
index 000000000..55e78ddb0
--- /dev/null
+++
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/InternalClassConverterTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.converter;
+
+import org.apache.flink.cdc.common.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit test cases for {@link InternalClassConverter}. */
+class InternalClassConverterTest {
+
+ @Test
+ void testConvertingFullTypes() {
+ assertThat(
+ Stream.of(
+ DataTypes.BOOLEAN(),
+ DataTypes.BYTES(),
+ DataTypes.BINARY(10),
+ DataTypes.VARBINARY(10),
+ DataTypes.CHAR(10),
+ DataTypes.VARCHAR(10),
+ DataTypes.STRING(),
+ DataTypes.INT(),
+ DataTypes.TINYINT(),
+ DataTypes.SMALLINT(),
+ DataTypes.BIGINT(),
+ DataTypes.DOUBLE(),
+ DataTypes.FLOAT(),
+ DataTypes.DECIMAL(6, 3),
+ DataTypes.DATE(),
+ DataTypes.TIME(),
+ DataTypes.TIME(6),
+ DataTypes.TIMESTAMP(),
+ DataTypes.TIMESTAMP(6),
+ DataTypes.TIMESTAMP_LTZ(),
+ DataTypes.TIMESTAMP_LTZ(6),
+ DataTypes.TIMESTAMP_TZ(),
+ DataTypes.TIMESTAMP_TZ(6),
+ DataTypes.ARRAY(DataTypes.BIGINT()),
+ DataTypes.MAP(DataTypes.SMALLINT(),
DataTypes.STRING()),
+ DataTypes.ROW(
+ DataTypes.FIELD("f1",
DataTypes.STRING()),
+ DataTypes.FIELD("f2",
DataTypes.STRING(), "desc")),
+ DataTypes.ROW(DataTypes.SMALLINT(),
DataTypes.STRING()),
+ DataTypes.VARIANT()))
+ .map(InternalClassConverter::toInternalClass)
+ .map(Class::getCanonicalName)
+ .containsExactly(
+ "java.lang.Boolean",
+ "byte[]",
+ "byte[]",
+ "byte[]",
+ "org.apache.flink.cdc.common.data.StringData",
+ "org.apache.flink.cdc.common.data.StringData",
+ "org.apache.flink.cdc.common.data.StringData",
+ "java.lang.Integer",
+ "java.lang.Byte",
+ "java.lang.Short",
+ "java.lang.Long",
+ "java.lang.Double",
+ "java.lang.Float",
+ "org.apache.flink.cdc.common.data.DecimalData",
+ "org.apache.flink.cdc.common.data.DateData",
+ "org.apache.flink.cdc.common.data.TimeData",
+ "org.apache.flink.cdc.common.data.TimeData",
+ "org.apache.flink.cdc.common.data.TimestampData",
+ "org.apache.flink.cdc.common.data.TimestampData",
+
"org.apache.flink.cdc.common.data.LocalZonedTimestampData",
+
"org.apache.flink.cdc.common.data.LocalZonedTimestampData",
+ "org.apache.flink.cdc.common.data.ZonedTimestampData",
+ "org.apache.flink.cdc.common.data.ZonedTimestampData",
+ "org.apache.flink.cdc.common.data.ArrayData",
+ "org.apache.flink.cdc.common.data.MapData",
+ "org.apache.flink.cdc.common.data.RecordData",
+ "org.apache.flink.cdc.common.data.RecordData",
+ "org.apache.flink.cdc.common.types.variant.Variant");
+ }
+}
diff --git
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/InternalObjectConverterTest.java
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/InternalObjectConverterTest.java
new file mode 100644
index 000000000..2a4c3e1b3
--- /dev/null
+++
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/InternalObjectConverterTest.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.converter;
+
+import org.apache.flink.cdc.common.data.ArrayData;
+import org.apache.flink.cdc.common.data.DateData;
+import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.GenericArrayData;
+import org.apache.flink.cdc.common.data.GenericMapData;
+import org.apache.flink.cdc.common.data.GenericRecordData;
+import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.MapData;
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.data.StringData;
+import org.apache.flink.cdc.common.data.TimeData;
+import org.apache.flink.cdc.common.data.TimestampData;
+import org.apache.flink.cdc.common.data.ZonedTimestampData;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.MapType;
+import org.apache.flink.cdc.common.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.flink.cdc.common.converter.InternalObjectConverter.convertToInternal;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit test cases for {@link InternalObjectConverter}. */
+class InternalObjectConverterTest {
+
+ @Test
+ void testConvertToBoolean() {
+ assertThat(convertToInternal(true,
DataTypes.BOOLEAN())).isEqualTo(true);
+ assertThat(convertToInternal(false,
DataTypes.BOOLEAN())).isEqualTo(false);
+ assertThat(convertToInternal(null, DataTypes.BOOLEAN())).isNull();
+ }
+
+ @Test
+ void testConvertToBytes() {
+ assertThat(convertToInternal("Alice".getBytes(), DataTypes.BYTES()))
+ .isInstanceOf(byte[].class)
+ .extracting(byte[].class::cast)
+ .extracting(Arrays::toString)
+ .isEqualTo("[65, 108, 105, 99, 101]");
+ assertThat(
+ convertToInternal(
+ new byte[] {(byte) 0xca, (byte) 0xfe, (byte)
0xba, (byte) 0xbe},
+ DataTypes.BYTES()))
+ .isInstanceOf(byte[].class)
+ .extracting(byte[].class::cast)
+ .extracting(Arrays::toString)
+ .isEqualTo("[-54, -2, -70, -66]");
+ assertThat(convertToInternal(null, DataTypes.BYTES())).isNull();
+ }
+
+ @Test
+ void testConvertToBinary() {
+ assertThat(convertToInternal("Alice".getBytes(), DataTypes.BINARY(5)))
+ .isEqualTo(new byte[] {65, 108, 105, 99, 101});
+ assertThat(
+ convertToInternal(
+ new byte[] {(byte) 0xca, (byte) 0xfe, (byte)
0xba, (byte) 0xbe},
+ DataTypes.BINARY(4)))
+ .isEqualTo(new byte[] {-54, -2, -70, -66});
+ assertThat(convertToInternal(null, DataTypes.BINARY(3))).isNull();
+ }
+
+ @Test
+ void testConvertToVarBinary() {
+ assertThat(convertToInternal("Alice".getBytes(),
DataTypes.VARBINARY(5)))
+ .isEqualTo(new byte[] {65, 108, 105, 99, 101});
+ assertThat(
+ convertToInternal(
+ new byte[] {(byte) 0xca, (byte) 0xfe, (byte)
0xba, (byte) 0xbe},
+ DataTypes.VARBINARY(4)))
+ .isEqualTo(new byte[] {-54, -2, -70, -66});
+ assertThat(convertToInternal(null, DataTypes.VARBINARY(3))).isNull();
+ }
+
+ @Test
+ void testConvertToChar() {
+ assertThat(convertToInternal("Alice", DataTypes.CHAR(5)))
+ .isInstanceOf(StringData.class)
+ .hasToString("Alice");
+
+ assertThat(convertToInternal(BinaryStringData.fromString("Bob"),
DataTypes.CHAR(5)))
+ .isInstanceOf(StringData.class)
+ .hasToString("Bob");
+
+ assertThat(convertToInternal(null, DataTypes.CHAR(5))).isNull();
+ }
+
+ @Test
+ void testConvertToVarChar() {
+ assertThat(convertToInternal("Alice", DataTypes.VARCHAR(5)))
+ .isInstanceOf(StringData.class)
+ .hasToString("Alice");
+
+ assertThat(convertToInternal(BinaryStringData.fromString("Bob"),
DataTypes.VARCHAR(5)))
+ .isInstanceOf(StringData.class)
+ .hasToString("Bob");
+
+ assertThat(convertToInternal(null, DataTypes.VARCHAR(5))).isNull();
+ }
+
+ @Test
+ void testConvertToString() {
+ assertThat(convertToInternal("Alice", DataTypes.STRING()))
+ .isInstanceOf(StringData.class)
+ .hasToString("Alice");
+ assertThat(convertToInternal(BinaryStringData.fromString("Bob"),
DataTypes.STRING()))
+ .isInstanceOf(StringData.class)
+ .hasToString("Bob");
+ assertThat(convertToInternal(null, DataTypes.STRING())).isNull();
+ }
+
+ @Test
+ void testConvertToInt() {
+ assertThat(convertToInternal(11, DataTypes.INT())).isEqualTo(11);
+ assertThat(convertToInternal(-14, DataTypes.INT())).isEqualTo(-14);
+ assertThat(convertToInternal(17, DataTypes.INT())).isEqualTo(17);
+ assertThat(convertToInternal(null, DataTypes.INT())).isNull();
+ }
+
+ @Test
+ void testConvertToTinyInt() {
+ assertThat(convertToInternal((byte) 11,
DataTypes.TINYINT())).isEqualTo((byte) 11);
+ assertThat(convertToInternal((byte) -14,
DataTypes.TINYINT())).isEqualTo((byte) -14);
+ assertThat(convertToInternal((byte) 17,
DataTypes.TINYINT())).isEqualTo((byte) 17);
+ assertThat(convertToInternal(null, DataTypes.TINYINT())).isNull();
+ }
+
+ @Test
+ void testConvertToSmallInt() {
+ assertThat(convertToInternal((short) 11,
DataTypes.SMALLINT())).isEqualTo((short) 11);
+ assertThat(convertToInternal((short) -14,
DataTypes.SMALLINT())).isEqualTo((short) -14);
+ assertThat(convertToInternal((short) 17,
DataTypes.SMALLINT())).isEqualTo((short) 17);
+ assertThat(convertToInternal(null, DataTypes.SMALLINT())).isNull();
+ }
+
+ @Test
+ void testConvertToBigInt() {
+ assertThat(convertToInternal((long) 11,
DataTypes.BIGINT())).isEqualTo((long) 11);
+ assertThat(convertToInternal((long) -14,
DataTypes.BIGINT())).isEqualTo((long) -14);
+ assertThat(convertToInternal((long) 17,
DataTypes.BIGINT())).isEqualTo((long) 17);
+ assertThat(convertToInternal(null, DataTypes.BIGINT())).isNull();
+ }
+
+ @Test
+ void testConvertToFloat() {
+ assertThat(convertToInternal((float) 11,
DataTypes.FLOAT())).isEqualTo((float) 11);
+ assertThat(convertToInternal((float) -14,
DataTypes.FLOAT())).isEqualTo((float) -14);
+ assertThat(convertToInternal((float) 17,
DataTypes.FLOAT())).isEqualTo((float) 17);
+ assertThat(convertToInternal(null, DataTypes.FLOAT())).isNull();
+ }
+
+ @Test
+ void testConvertToDouble() {
+ assertThat(convertToInternal((double) 11,
DataTypes.DOUBLE())).isEqualTo((double) 11);
+ assertThat(convertToInternal((double) -14,
DataTypes.DOUBLE())).isEqualTo((double) -14);
+ assertThat(convertToInternal((double) 17,
DataTypes.DOUBLE())).isEqualTo((double) 17);
+ assertThat(convertToInternal(null, DataTypes.DOUBLE())).isNull();
+ }
+
+ @Test
+ void testConvertToDecimal() {
+ assertThat(convertToInternal(new BigDecimal("4.2"),
DataTypes.DECIMAL(2, 1)))
+ .isInstanceOf(DecimalData.class)
+ .hasToString("4.2");
+ assertThat(convertToInternal(new BigDecimal("-3.1415926"),
DataTypes.DECIMAL(20, 10)))
+ .isInstanceOf(DecimalData.class)
+ .hasToString("-3.1415926");
+
+ assertThat(
+ convertToInternal(
+ DecimalData.fromUnscaledLong(42, 2, 1),
DataTypes.DECIMAL(2, 1)))
+ .isInstanceOf(DecimalData.class)
+ .hasToString("4.2");
+ assertThat(
+ convertToInternal(
+ DecimalData.fromUnscaledLong(-31415926, 14, 7),
+ DataTypes.DECIMAL(14, 7)))
+ .isInstanceOf(DecimalData.class)
+ .hasToString("-3.1415926");
+
+ assertThat(convertToInternal(null, DataTypes.DECIMAL(20,
10))).isNull();
+ }
+
+ @Test
+ void testConvertToDate() {
+ assertThat(convertToInternal(LocalDate.of(2017, 12, 31),
DataTypes.DATE()))
+ .isInstanceOf(DateData.class)
+ .hasToString("2017-12-31");
+ assertThat(convertToInternal(DateData.fromEpochDay(14417),
DataTypes.DATE()))
+ .isInstanceOf(DateData.class)
+ .hasToString("2009-06-22");
+ assertThat(convertToInternal(null, DataTypes.DATE())).isNull();
+ }
+
+ @Test
+ void testConvertToTime() {
+ assertThat(convertToInternal(LocalTime.of(21, 48, 25),
DataTypes.TIME(0)))
+ .isInstanceOf(TimeData.class)
+ .hasToString("21:48:25");
+ assertThat(convertToInternal(LocalTime.ofSecondOfDay(14419),
DataTypes.TIME(0)))
+ .isInstanceOf(TimeData.class)
+ .hasToString("04:00:19");
+ assertThat(convertToInternal(LocalTime.of(21, 48, 25, 123456789),
DataTypes.TIME(3)))
+ .isInstanceOf(TimeData.class)
+ .hasToString("21:48:25.123");
+ assertThat(convertToInternal(LocalTime.ofNanoOfDay(14419123456789L),
DataTypes.TIME(3)))
+ .isInstanceOf(TimeData.class)
+ .hasToString("04:00:19.123");
+
+ assertThat(
+ convertToInternal(
+ TimeData.fromLocalTime(LocalTime.of(21, 48,
25)),
+ DataTypes.TIME(0)))
+ .isInstanceOf(TimeData.class)
+ .hasToString("21:48:25");
+ assertThat(convertToInternal(TimeData.fromSecondOfDay(14419),
DataTypes.TIME(0)))
+ .isInstanceOf(TimeData.class)
+ .hasToString("04:00:19");
+ assertThat(
+ convertToInternal(
+ TimeData.fromLocalTime(LocalTime.of(21, 48,
25, 123456789)),
+ DataTypes.TIME(3)))
+ .isInstanceOf(TimeData.class)
+ .hasToString("21:48:25.123");
+ assertThat(convertToInternal(TimeData.fromNanoOfDay(14419123456789L),
DataTypes.TIME(3)))
+ .isInstanceOf(TimeData.class)
+ .hasToString("04:00:19.123");
+ assertThat(convertToInternal(null, DataTypes.TIME())).isNull();
+ }
+
+ @Test
+ void testConvertToTimestamp() {
+ assertThat(
+ convertToInternal(
+ TimestampData.fromMillis(2147483648491L),
DataTypes.TIMESTAMP(3)))
+ .isInstanceOf(TimestampData.class)
+ .hasToString("2038-01-19T03:14:08.491");
+ assertThat(
+ convertToInternal(
+ LocalDateTime.of(2019, 12, 25, 21, 48, 25,
123456789),
+ DataTypes.TIMESTAMP(9)))
+ .isInstanceOf(TimestampData.class)
+ .hasToString("2019-12-25T21:48:25.123456789");
+ assertThat(convertToInternal(null, DataTypes.TIMESTAMP())).isNull();
+ }
+
+ @Test
+ void testConvertToZonedTimestamp() {
+ assertThat(
+ convertToInternal(
+ ZonedTimestampData.of(2143658709L, 0, "UTC"),
+ DataTypes.TIMESTAMP_TZ(3)))
+ .isInstanceOf(ZonedTimestampData.class)
+ .hasToString("1970-01-25T19:27:38.709Z");
+ assertThat(
+ convertToInternal(
+ ZonedDateTime.of(
+ 2019,
+ 12,
+ 25,
+ 21,
+ 48,
+ 25,
+ 123456789,
+ ZoneId.of("UTC+08:00")),
+ DataTypes.TIMESTAMP_TZ(9)))
+ .isInstanceOf(ZonedTimestampData.class)
+ .hasToString("2019-12-25T21:48:25.123456789+08:00");
+ assertThat(convertToInternal(null, DataTypes.TIMESTAMP_TZ())).isNull();
+ }
+
+ @Test
+ void testConvertToLocalZonedTimestamp() {
+ assertThat(
+ convertToInternal(
+
LocalZonedTimestampData.fromEpochMillis(3141592653589L),
+ DataTypes.TIMESTAMP_LTZ(3)))
+ .isInstanceOf(LocalZonedTimestampData.class)
+ .hasToString("2069-07-21T00:37:33.589");
+ assertThat(
+ convertToInternal(
+ Instant.ofEpochSecond(2718281828L, 123456789),
+ DataTypes.TIMESTAMP_LTZ(9)))
+ .isInstanceOf(LocalZonedTimestampData.class)
+ .hasToString("2056-02-20T14:17:08.123456789");
+ assertThat(convertToInternal(null,
DataTypes.TIMESTAMP_LTZ())).isNull();
+ }
+
+ @Test
+ void testConvertToArray() {
+ assertThat(
+ convertToInternal(
+ Arrays.asList("Alice", "Bob", "Charlie"),
+ DataTypes.ARRAY(DataTypes.STRING())))
+ .isInstanceOf(ArrayData.class)
+ .hasToString("[Alice, Bob, Charlie]")
+ .extracting(ArrayData.class::cast)
+ .extracting(s -> s.getString(0))
+ .isInstanceOf(StringData.class);
+ assertThat(
+ convertToInternal(
+ new GenericArrayData(
+ new StringData[] {
+
BinaryStringData.fromString("Derrida"),
+
BinaryStringData.fromString("Enigma"),
+ BinaryStringData.fromString("Fall")
+ }),
+ DataTypes.ARRAY(DataTypes.STRING())))
+ .isInstanceOf(ArrayData.class)
+ .hasToString("[Derrida, Enigma, Fall]")
+ .extracting(ArrayData.class::cast)
+ .extracting(s -> s.getString(0))
+ .isInstanceOf(StringData.class);
+ assertThat(convertToInternal(null,
DataTypes.TIMESTAMP_LTZ())).isNull();
+ }
+
+ @Test
+ void testConvertToMap() {
+ MapType targetType =
+ DataTypes.MAP(
+ DataTypes.STRING(), DataTypes.ROW(DataTypes.INT(),
DataTypes.STRING()));
+ assertThat(
+ convertToInternal(
+ Map.of(
+ "Alice", List.of(5, "AILISI"),
+ "Bob", List.of(3, "BAOBO"),
+ "Cancan", List.of(6, "KANGKANG")),
+ targetType))
+ .isInstanceOf(MapData.class)
+ .hasToString("{Alice=(5,AILISI), Cancan=(6,KANGKANG),
Bob=(3,BAOBO)}")
+ .extracting(MapData.class::cast)
+ .extracting(MapData::keyArray, MapData::valueArray)
+ .map(Object::toString)
+ .containsExactly("[Alice, Cancan, Bob]", "[(5,AILISI),
(6,KANGKANG), (3,BAOBO)]");
+
+ assertThat(
+ convertToInternal(
+ new GenericMapData(
+ Map.of(
+
BinaryStringData.fromString("Derrida"),
+ GenericRecordData.of(
+ 7,
BinaryStringData.fromString("DELIDA")))),
+ targetType))
+ .isInstanceOf(MapData.class)
+ .hasToString("{Derrida=(7,DELIDA)}")
+ .extracting(MapData.class::cast)
+ .extracting(MapData::keyArray, MapData::valueArray)
+ .map(Object::toString)
+ .containsExactly("[Derrida]", "[(7,DELIDA)]");
+ assertThat(convertToInternal(null, targetType)).isNull();
+ }
+
+ @Test
+ void testConvertToRow() {
+ RowType targetType =
+ DataTypes.ROW(
+ DataTypes.STRING(), DataTypes.ROW(DataTypes.INT(),
DataTypes.STRING()));
+ assertThat(convertToInternal(List.of("Alice", List.of(5, "AILISI")),
targetType))
+ .isInstanceOf(RecordData.class)
+ .hasToString("(Alice,(5,AILISI))")
+ .extracting(RecordData.class::cast)
+ .extracting(o -> o.getString(0))
+ .isInstanceOf(StringData.class)
+ .hasToString("Alice");
+
+ assertThat(convertToInternal(List.of("Bob", List.of(3, "BAOBO")),
targetType))
+ .isInstanceOf(RecordData.class)
+ .hasToString("(Bob,(3,BAOBO))")
+ .extracting(RecordData.class::cast)
+ .extracting(o -> o.getRow(1, 2))
+ .isInstanceOf(RecordData.class)
+ .hasToString("(3,BAOBO)")
+ .extracting(o -> o.getString(1))
+ .isInstanceOf(StringData.class);
+
+ assertThat(convertToInternal(null, targetType)).isNull();
+ }
+}
diff --git
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/JavaClassConverterTest.java
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/JavaClassConverterTest.java
new file mode 100644
index 000000000..fb84bd7a7
--- /dev/null
+++
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/JavaClassConverterTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.converter;
+
+import org.apache.flink.cdc.common.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit test cases for {@link JavaClassConverter}. */
+class JavaClassConverterTest {
+
+ @Test
+ void testConvertingFullTypes() {
+ assertThat(
+ Stream.of(
+ DataTypes.BOOLEAN(),
+ DataTypes.BYTES(),
+ DataTypes.BINARY(10),
+ DataTypes.VARBINARY(10),
+ DataTypes.CHAR(10),
+ DataTypes.VARCHAR(10),
+ DataTypes.STRING(),
+ DataTypes.INT(),
+ DataTypes.TINYINT(),
+ DataTypes.SMALLINT(),
+ DataTypes.BIGINT(),
+ DataTypes.DOUBLE(),
+ DataTypes.FLOAT(),
+ DataTypes.DECIMAL(6, 3),
+ DataTypes.DATE(),
+ DataTypes.TIME(),
+ DataTypes.TIME(6),
+ DataTypes.TIMESTAMP(),
+ DataTypes.TIMESTAMP(6),
+ DataTypes.TIMESTAMP_LTZ(),
+ DataTypes.TIMESTAMP_LTZ(6),
+ DataTypes.TIMESTAMP_TZ(),
+ DataTypes.TIMESTAMP_TZ(6),
+ DataTypes.ARRAY(DataTypes.BIGINT()),
+ DataTypes.MAP(DataTypes.SMALLINT(),
DataTypes.STRING()),
+ DataTypes.ROW(
+ DataTypes.FIELD("f1",
DataTypes.STRING()),
+ DataTypes.FIELD("f2",
DataTypes.STRING(), "desc")),
+ DataTypes.ROW(DataTypes.SMALLINT(),
DataTypes.STRING()),
+ DataTypes.VARIANT()))
+ .map(JavaClassConverter::toJavaClass)
+ .map(Class::getCanonicalName)
+ .containsExactly(
+ "java.lang.Boolean",
+ "byte[]",
+ "byte[]",
+ "byte[]",
+ "java.lang.String",
+ "java.lang.String",
+ "java.lang.String",
+ "java.lang.Integer",
+ "java.lang.Byte",
+ "java.lang.Short",
+ "java.lang.Long",
+ "java.lang.Double",
+ "java.lang.Float",
+ "java.math.BigDecimal",
+ "java.time.LocalDate",
+ "java.time.LocalTime",
+ "java.time.LocalTime",
+ "java.time.LocalDateTime",
+ "java.time.LocalDateTime",
+ "java.time.Instant",
+ "java.time.Instant",
+ "java.time.ZonedDateTime",
+ "java.time.ZonedDateTime",
+ "java.util.List",
+ "java.util.Map",
+ "java.util.List",
+ "java.util.List",
+ "org.apache.flink.cdc.common.types.variant.Variant");
+ }
+}
diff --git
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/JavaObjectConverterTest.java
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/JavaObjectConverterTest.java
new file mode 100644
index 000000000..786ef63e2
--- /dev/null
+++
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/JavaObjectConverterTest.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.converter;
+
+import org.apache.flink.cdc.common.data.DateData;
+import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.GenericArrayData;
+import org.apache.flink.cdc.common.data.GenericMapData;
+import org.apache.flink.cdc.common.data.GenericRecordData;
+import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.StringData;
+import org.apache.flink.cdc.common.data.TimeData;
+import org.apache.flink.cdc.common.data.TimestampData;
+import org.apache.flink.cdc.common.data.ZonedTimestampData;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.MapType;
+import org.apache.flink.cdc.common.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.flink.cdc.common.converter.JavaObjectConverter.convertToJava;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.InstanceOfAssertFactories.list;
+import static org.assertj.core.api.InstanceOfAssertFactories.map;
+
+/** Unit test cases for {@link JavaObjectConverter}. */
+class JavaObjectConverterTest {
+
+ @Test
+ void testConvertToBoolean() {
+ assertThat(convertToJava(true, DataTypes.BOOLEAN())).isEqualTo(true);
+ assertThat(convertToJava(false, DataTypes.BOOLEAN())).isEqualTo(false);
+ assertThat(convertToJava(null, DataTypes.BOOLEAN())).isNull();
+ }
+
+ @Test
+ void testConvertToBytes() {
+ assertThat(convertToJava("Alice".getBytes(), DataTypes.BYTES()))
+ .isInstanceOf(byte[].class)
+ .extracting(byte[].class::cast)
+ .extracting(Arrays::toString)
+ .isEqualTo("[65, 108, 105, 99, 101]");
+ assertThat(
+ convertToJava(
+ new byte[] {(byte) 0xca, (byte) 0xfe, (byte)
0xba, (byte) 0xbe},
+ DataTypes.BYTES()))
+ .isInstanceOf(byte[].class)
+ .extracting(byte[].class::cast)
+ .extracting(Arrays::toString)
+ .isEqualTo("[-54, -2, -70, -66]");
+ assertThat(convertToJava(null, DataTypes.BYTES())).isNull();
+ }
+
+ @Test
+ void testConvertToBinary() {
+ assertThat(convertToJava("Alice".getBytes(), DataTypes.BINARY(5)))
+ .isEqualTo(new byte[] {65, 108, 105, 99, 101});
+ assertThat(
+ convertToJava(
+ new byte[] {(byte) 0xca, (byte) 0xfe, (byte)
0xba, (byte) 0xbe},
+ DataTypes.BINARY(4)))
+ .isEqualTo(new byte[] {-54, -2, -70, -66});
+ assertThat(convertToJava(null, DataTypes.BINARY(3))).isNull();
+ }
+
+ @Test
+ void testConvertToVarBinary() {
+ assertThat(convertToJava("Alice".getBytes(), DataTypes.VARBINARY(5)))
+ .isEqualTo(new byte[] {65, 108, 105, 99, 101});
+ assertThat(
+ convertToJava(
+ new byte[] {(byte) 0xca, (byte) 0xfe, (byte)
0xba, (byte) 0xbe},
+ DataTypes.VARBINARY(4)))
+ .isEqualTo(new byte[] {-54, -2, -70, -66});
+ assertThat(convertToJava(null, DataTypes.VARBINARY(3))).isNull();
+ }
+
+ @Test
+ void testConvertToChar() {
+ assertThat(convertToJava("Alice", DataTypes.CHAR(5)))
+ .isInstanceOf(String.class)
+ .hasToString("Alice");
+
+ assertThat(convertToJava(BinaryStringData.fromString("Bob"),
DataTypes.CHAR(5)))
+ .isInstanceOf(String.class)
+ .hasToString("Bob");
+
+ assertThat(convertToJava(null, DataTypes.CHAR(5))).isNull();
+ }
+
+ @Test
+ void testConvertToVarChar() {
+ assertThat(convertToJava("Alice", DataTypes.VARCHAR(5)))
+ .isInstanceOf(String.class)
+ .hasToString("Alice");
+
+ assertThat(convertToJava(BinaryStringData.fromString("Bob"),
DataTypes.VARCHAR(5)))
+ .isInstanceOf(String.class)
+ .hasToString("Bob");
+
+ assertThat(convertToJava(null, DataTypes.VARCHAR(5))).isNull();
+ }
+
+ @Test
+ void testConvertToString() {
+ assertThat(convertToJava("Alice", DataTypes.STRING()))
+ .isInstanceOf(String.class)
+ .hasToString("Alice");
+ assertThat(convertToJava(BinaryStringData.fromString("Bob"),
DataTypes.STRING()))
+ .isInstanceOf(String.class)
+ .hasToString("Bob");
+ assertThat(convertToJava(null, DataTypes.STRING())).isNull();
+ }
+
+ @Test
+ void testConvertToInt() {
+ assertThat(convertToJava(11, DataTypes.INT())).isEqualTo(11);
+ assertThat(convertToJava(-14, DataTypes.INT())).isEqualTo(-14);
+ assertThat(convertToJava(17, DataTypes.INT())).isEqualTo(17);
+ assertThat(convertToJava(null, DataTypes.INT())).isNull();
+ }
+
+ @Test
+ void testConvertToTinyInt() {
+ assertThat(convertToJava((byte) 11,
DataTypes.TINYINT())).isEqualTo((byte) 11);
+ assertThat(convertToJava((byte) -14,
DataTypes.TINYINT())).isEqualTo((byte) -14);
+ assertThat(convertToJava((byte) 17,
DataTypes.TINYINT())).isEqualTo((byte) 17);
+ assertThat(convertToJava(null, DataTypes.TINYINT())).isNull();
+ }
+
+ @Test
+ void testConvertToSmallInt() {
+ assertThat(convertToJava((short) 11,
DataTypes.SMALLINT())).isEqualTo((short) 11);
+ assertThat(convertToJava((short) -14,
DataTypes.SMALLINT())).isEqualTo((short) -14);
+ assertThat(convertToJava((short) 17,
DataTypes.SMALLINT())).isEqualTo((short) 17);
+ assertThat(convertToJava(null, DataTypes.SMALLINT())).isNull();
+ }
+
+ @Test
+ void testConvertToBigInt() {
+ assertThat(convertToJava((long) 11,
DataTypes.BIGINT())).isEqualTo((long) 11);
+ assertThat(convertToJava((long) -14,
DataTypes.BIGINT())).isEqualTo((long) -14);
+ assertThat(convertToJava((long) 17,
DataTypes.BIGINT())).isEqualTo((long) 17);
+ assertThat(convertToJava(null, DataTypes.BIGINT())).isNull();
+ }
+
+ @Test
+ void testConvertToFloat() {
+ assertThat(convertToJava((float) 11,
DataTypes.FLOAT())).isEqualTo((float) 11);
+ assertThat(convertToJava((float) -14,
DataTypes.FLOAT())).isEqualTo((float) -14);
+ assertThat(convertToJava((float) 17,
DataTypes.FLOAT())).isEqualTo((float) 17);
+ assertThat(convertToJava(null, DataTypes.FLOAT())).isNull();
+ }
+
+ @Test
+ void testConvertToDouble() {
+ assertThat(convertToJava((double) 11,
DataTypes.DOUBLE())).isEqualTo((double) 11);
+ assertThat(convertToJava((double) -14,
DataTypes.DOUBLE())).isEqualTo((double) -14);
+ assertThat(convertToJava((double) 17,
DataTypes.DOUBLE())).isEqualTo((double) 17);
+ assertThat(convertToJava(null, DataTypes.DOUBLE())).isNull();
+ }
+
+ @Test
+ void testConvertToDecimal() {
+ assertThat(convertToJava(new BigDecimal("4.2"), DataTypes.DECIMAL(2,
1)))
+ .isInstanceOf(BigDecimal.class)
+ .hasToString("4.2");
+ assertThat(convertToJava(new BigDecimal("-3.1415926"),
DataTypes.DECIMAL(20, 10)))
+ .isInstanceOf(BigDecimal.class)
+ .hasToString("-3.1415926");
+
+ assertThat(convertToJava(DecimalData.fromUnscaledLong(42, 2, 1),
DataTypes.DECIMAL(2, 1)))
+ .isInstanceOf(BigDecimal.class)
+ .hasToString("4.2");
+ assertThat(
+ convertToJava(
+ DecimalData.fromUnscaledLong(-31415926, 14, 7),
+ DataTypes.DECIMAL(14, 7)))
+ .isInstanceOf(BigDecimal.class)
+ .hasToString("-3.1415926");
+
+ assertThat(convertToJava(null, DataTypes.DECIMAL(20, 10))).isNull();
+ }
+
+ @Test
+ void testConvertToDate() {
+ assertThat(convertToJava(LocalDate.of(2017, 12, 31), DataTypes.DATE()))
+ .isInstanceOf(LocalDate.class)
+ .hasToString("2017-12-31");
+ assertThat(convertToJava(DateData.fromEpochDay(14417),
DataTypes.DATE()))
+ .isInstanceOf(LocalDate.class)
+ .hasToString("2009-06-22");
+ assertThat(convertToJava(null, DataTypes.DATE())).isNull();
+ }
+
+ @Test
+ void testConvertToTime() {
+ assertThat(convertToJava(LocalTime.of(21, 48, 25), DataTypes.TIME(0)))
+ .isInstanceOf(LocalTime.class)
+ .hasToString("21:48:25");
+ assertThat(convertToJava(LocalTime.ofSecondOfDay(14419),
DataTypes.TIME(0)))
+ .isInstanceOf(LocalTime.class)
+ .hasToString("04:00:19");
+ assertThat(convertToJava(LocalTime.of(21, 48, 25, 123456789),
DataTypes.TIME(3)))
+ .isInstanceOf(LocalTime.class)
+ .hasToString("21:48:25.123456789");
+ assertThat(convertToJava(LocalTime.ofNanoOfDay(14419123456789L),
DataTypes.TIME(3)))
+ .isInstanceOf(LocalTime.class)
+ .hasToString("04:00:19.123456789");
+
+ assertThat(
+ convertToJava(
+ TimeData.fromLocalTime(LocalTime.of(21, 48,
25)),
+ DataTypes.TIME(0)))
+ .isInstanceOf(LocalTime.class)
+ .hasToString("21:48:25");
+ assertThat(convertToJava(TimeData.fromSecondOfDay(14419),
DataTypes.TIME(0)))
+ .isInstanceOf(LocalTime.class)
+ .hasToString("04:00:19");
+ assertThat(
+ convertToJava(
+ TimeData.fromLocalTime(LocalTime.of(21, 48,
25, 123456789)),
+ DataTypes.TIME(3)))
+ .isInstanceOf(LocalTime.class)
+ .hasToString("21:48:25.123");
+ assertThat(convertToJava(TimeData.fromNanoOfDay(14419123456789L),
DataTypes.TIME(3)))
+ .isInstanceOf(LocalTime.class)
+ .hasToString("04:00:19.123");
+ assertThat(convertToJava(null, DataTypes.TIME())).isNull();
+ }
+
+ @Test
+ void testConvertToTimestamp() {
+ assertThat(convertToJava(TimestampData.fromMillis(2147483648491L),
DataTypes.TIMESTAMP(3)))
+ .isInstanceOf(LocalDateTime.class)
+ .hasToString("2038-01-19T03:14:08.491");
+ assertThat(
+ convertToJava(
+ LocalDateTime.of(2019, 12, 25, 21, 48, 25,
123456789),
+ DataTypes.TIMESTAMP(9)))
+ .isInstanceOf(LocalDateTime.class)
+ .hasToString("2019-12-25T21:48:25.123456789");
+ assertThat(convertToJava(null, DataTypes.TIMESTAMP())).isNull();
+ }
+
+ @Test
+ void testConvertToZonedTimestamp() {
+ assertThat(
+ convertToJava(
+ ZonedTimestampData.of(2143658709L, 0, "UTC"),
+ DataTypes.TIMESTAMP_TZ(3)))
+ .isInstanceOf(ZonedDateTime.class)
+ .hasToString("1970-01-25T19:27:38.709Z[UTC]");
+ assertThat(
+ convertToJava(
+ ZonedDateTime.of(
+ 2019,
+ 12,
+ 25,
+ 21,
+ 48,
+ 25,
+ 123456789,
+ ZoneId.of("UTC+08:00")),
+ DataTypes.TIMESTAMP_TZ(9)))
+ .isInstanceOf(ZonedDateTime.class)
+ .hasToString("2019-12-25T21:48:25.123456789+08:00[UTC+08:00]");
+ assertThat(convertToJava(null, DataTypes.TIMESTAMP_TZ())).isNull();
+ }
+
+ @Test
+ void testConvertToLocalZonedTimestamp() {
+ assertThat(
+ convertToJava(
+
LocalZonedTimestampData.fromEpochMillis(3141592653589L),
+ DataTypes.TIMESTAMP_LTZ(3)))
+ .isInstanceOf(Instant.class)
+ .hasToString("2069-07-21T00:37:33.589Z");
+ assertThat(
+ convertToJava(
+ Instant.ofEpochSecond(2718281828L, 123456789),
+ DataTypes.TIMESTAMP_LTZ(9)))
+ .isInstanceOf(Instant.class)
+ .hasToString("2056-02-20T14:17:08.123456789Z");
+ assertThat(convertToJava(null, DataTypes.TIMESTAMP_LTZ())).isNull();
+ }
+
+ @Test
+ void testConvertToArray() {
+ assertThat(
+ convertToJava(
+ Arrays.asList("Alice", "Bob", "Charlie"),
+ DataTypes.ARRAY(DataTypes.STRING())))
+ .isInstanceOf(List.class)
+ .hasToString("[Alice, Bob, Charlie]")
+ .extracting(List.class::cast)
+ .extracting(s -> s.get(0))
+ .isInstanceOf(String.class);
+ assertThat(
+ convertToJava(
+ new GenericArrayData(
+ new StringData[] {
+
BinaryStringData.fromString("Derrida"),
+
BinaryStringData.fromString("Enigma"),
+ BinaryStringData.fromString("Fall")
+ }),
+ DataTypes.ARRAY(DataTypes.STRING())))
+ .isInstanceOf(List.class)
+ .hasToString("[Derrida, Enigma, Fall]")
+ .extracting(List.class::cast)
+ .extracting(s -> s.get(0))
+ .isInstanceOf(String.class);
+ assertThat(convertToJava(null, DataTypes.TIMESTAMP_LTZ())).isNull();
+ }
+
+ @Test
+ void testConvertToMap() {
+ MapType targetType =
+ DataTypes.MAP(
+ DataTypes.STRING(), DataTypes.ROW(DataTypes.INT(),
DataTypes.STRING()));
+ Map<String, List<Object>> originalMap =
+ Map.of(
+ "Alice", List.of(5, "AILISI"),
+ "Bob", List.of(3, "BAOBO"),
+ "Cancan", List.of(6, "KANGKANG"));
+ assertThat(convertToJava(originalMap, targetType))
+ .isInstanceOf(Map.class)
+ .asInstanceOf(map(String.class, List.class))
+ .containsExactlyEntriesOf(originalMap);
+
+ assertThat(
+ convertToJava(
+ new GenericMapData(
+ Map.of(
+
BinaryStringData.fromString("Derrida"),
+ GenericRecordData.of(
+ 7,
BinaryStringData.fromString("DELIDA")))),
+ targetType))
+ .isInstanceOf(Map.class)
+ .asInstanceOf(map(String.class, List.class))
+ .containsExactlyEntriesOf(Map.of("Derrida", List.of(7,
"DELIDA")));
+ assertThat(convertToJava(null, targetType)).isNull();
+ }
+
+ @Test
+ void testConvertToRow() {
+ RowType targetType =
+ DataTypes.ROW(
+ DataTypes.STRING(), DataTypes.ROW(DataTypes.INT(),
DataTypes.STRING()));
+ assertThat(convertToJava(List.of("Alice", List.of(5, "AILISI")),
targetType))
+ .isInstanceOf(List.class)
+ .asInstanceOf(list(Object.class))
+ .containsExactly("Alice", List.of(5, "AILISI"));
+
+ assertThat(convertToJava(List.of("Bob", List.of(3, "BAOBO")),
targetType))
+ .isInstanceOf(List.class)
+ .asInstanceOf(list(Object.class))
+ .containsExactly("Bob", List.of(3, "BAOBO"));
+
+ assertThat(convertToJava(null, targetType)).isNull();
+ }
+}
diff --git
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/VariantConvertingTest.java
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/VariantConvertingTest.java
new file mode 100644
index 000000000..fbcc66fd1
--- /dev/null
+++
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/VariantConvertingTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.converter;
+
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.variant.BinaryVariantBuilder;
+import org.apache.flink.cdc.common.types.variant.Variant;
+import org.apache.flink.cdc.common.types.variant.VariantBuilder;
+import org.apache.flink.cdc.common.utils.SchemaMergingUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Converter tests for {@link org.apache.flink.cdc.common.types.VariantType}.
*/
+class VariantConvertingTest {
+
+ private static final BinaryVariantBuilder BUILDER = new
BinaryVariantBuilder();
+ private static final VariantBuilder.VariantArrayBuilder ARRAY_BUILDER =
+ new BinaryVariantBuilder.VariantArrayBuilder();
+ private static final BinaryVariantBuilder.VariantObjectBuilder
OBJECT_BUILDER =
+ new BinaryVariantBuilder.VariantObjectBuilder(true);
+
+ private static final Variant[] TEST_VARIANTS = {
+ BUILDER.of(true),
+ BUILDER.of((byte) 2),
+ BUILDER.of((short) 3),
+ BUILDER.of(5),
+ BUILDER.of((long) 7),
+ BUILDER.of("11"),
+ BUILDER.of((double) 13),
+ BUILDER.of((float) 17),
+ BUILDER.of("19".getBytes()),
+ BUILDER.of(new BigDecimal("23")),
+ BUILDER.of(Instant.ofEpochMilli(29)),
+ BUILDER.of(LocalDate.ofEpochDay(31)),
+ BUILDER.of(LocalDateTime.ofEpochSecond(37, 37, ZoneOffset.UTC)),
+ BUILDER.ofNull(),
+ ARRAY_BUILDER
+ .add(BUILDER.of(true))
+ .add(BUILDER.of((byte) 2))
+ .add(BUILDER.of((short) 3))
+ .add(BUILDER.of(5))
+ .add(BUILDER.of((long) 7))
+ .add(BUILDER.of("11"))
+ .add(BUILDER.of((double) 13))
+ .add(BUILDER.of((float) 17))
+ .add(BUILDER.of("19".getBytes()))
+ .add(BUILDER.of(new BigDecimal("23")))
+ .add(BUILDER.of(Instant.ofEpochMilli(29)))
+ .add(BUILDER.of(LocalDate.ofEpochDay(31)))
+ .add(BUILDER.of(LocalDateTime.ofEpochSecond(37, 37,
ZoneOffset.UTC)))
+ .add(BUILDER.ofNull())
+ .build(),
+ OBJECT_BUILDER
+ .add("col_bool", BUILDER.of(true))
+ .add("col_tinyint", BUILDER.of((byte) 2))
+ .add("col_shortint", BUILDER.of((short) 3))
+ .add("col_int", BUILDER.of(5))
+ .add("col_bigint", BUILDER.of((long) 7))
+ .add("col_string", BUILDER.of("11"))
+ .add("col_double", BUILDER.of((double) 13))
+ .add("col_float", BUILDER.of((float) 17))
+ .add("col_bytes", BUILDER.of("19".getBytes()))
+ .add("col_decimal", BUILDER.of(new BigDecimal("23")))
+ .add("col_timestamp", BUILDER.of(Instant.ofEpochMilli(29)))
+ .add("col_date", BUILDER.of(LocalDate.ofEpochDay(31)))
+ .add(
+ "col_datetime",
+ BUILDER.of(LocalDateTime.ofEpochSecond(37, 37,
ZoneOffset.UTC)))
+ .add("col_null", BUILDER.ofNull())
+ .build()
+ };
+
+ @Test
+ void testConvertingFromVariant() {
+ assertThat(Stream.of(TEST_VARIANTS))
+ .map(o -> InternalObjectConverter.convertToInternal(o,
DataTypes.VARIANT()))
+ .containsExactly(TEST_VARIANTS);
+ }
+
+ @Test
+ void testConvertingToVariant() {
+ assertThat(Stream.of(TEST_VARIANTS))
+ .map(o -> JavaObjectConverter.convertToJava(o,
DataTypes.VARIANT()))
+ .containsExactly(TEST_VARIANTS);
+ }
+
+ @Test
+ void testVariantTypeCoercion() {
+ List<BinaryStringData> expectedStringResult =
+ Stream.of(
+ "true",
+ "2",
+ "3",
+ "5",
+ "7",
+ "\"11\"",
+ "13.0",
+ "17.0",
+ "\"MTk=\"",
+ "23",
+ "\"1970-01-01T00:00:00.029+00:00\"",
+ "\"1970-02-01\"",
+ "\"1970-01-01T00:00:37\"",
+ "null",
+
"[true,2,3,5,7,\"11\",13.0,17.0,\"MTk=\",23,\"1970-01-01T00:00:00.029+00:00\",\"1970-02-01\",\"1970-01-01T00:00:37\",null]",
+
"{\"col_bigint\":7,\"col_bool\":true,\"col_bytes\":\"MTk=\",\"col_date\":\"1970-02-01\",\"col_datetime\":\"1970-01-01T00:00:37\",\"col_decimal\":23,\"col_double\":13.0,\"col_float\":17.0,\"col_int\":5,\"col_null\":null,\"col_shortint\":3,\"col_string\":\"11\",\"col_timestamp\":\"1970-01-01T00:00:00.029+00:00\",\"col_tinyint\":2}")
+ .map(BinaryStringData::new)
+ .collect(Collectors.toList());
+
+ assertThat(Stream.of(TEST_VARIANTS))
+ .map(
+ variant ->
+ SchemaMergingUtils.coerceObject(
+ "UTC", variant, DataTypes.VARIANT(),
DataTypes.STRING()))
+ .containsExactlyElementsOf(expectedStringResult);
+ }
+}
diff --git
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaMergingUtilsTest.java
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaMergingUtilsTest.java
index cc653857e..1f53668d4 100644
---
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaMergingUtilsTest.java
+++
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaMergingUtilsTest.java
@@ -104,6 +104,7 @@ class SchemaMergingUtilsTest {
private static final DataType ROW = DataTypes.ROW(INT, STRING);
private static final DataType ARRAY = DataTypes.ARRAY(STRING);
private static final DataType MAP = DataTypes.MAP(INT, STRING);
+ private static final DataType VARIANT = DataTypes.VARIANT();
private static final List<DataType> ALL_TYPES =
Arrays.asList(
@@ -130,7 +131,8 @@ class SchemaMergingUtilsTest {
// Complex types
ROW,
ARRAY,
- MAP);
+ MAP,
+ VARIANT);
private static final Map<DataType, Object> DUMMY_OBJECTS =
ImmutableMap.of(
@@ -966,35 +968,35 @@ class SchemaMergingUtilsTest {
Arrays.asList(
STRING, STRING, STRING, STRING, STRING, STRING,
STRING, STRING, STRING,
STRING, STRING, STRING, STRING, STRING, STRING,
STRING, STRING, STRING,
- STRING));
+ STRING, STRING));
assertTypeMergingVector(
CHAR,
Arrays.asList(
STRING, CHAR, STRING, STRING, STRING, STRING, STRING,
STRING, STRING,
STRING, STRING, STRING, STRING, STRING, STRING,
STRING, STRING, STRING,
- STRING));
+ STRING, STRING));
assertTypeMergingVector(
VARCHAR,
Arrays.asList(
STRING, STRING, VARCHAR, STRING, STRING, STRING,
STRING, STRING, STRING,
STRING, STRING, STRING, STRING, STRING, STRING,
STRING, STRING, STRING,
- STRING));
+ STRING, STRING));
assertTypeMergingVector(
BINARY,
Arrays.asList(
STRING, STRING, STRING, BINARY, STRING, STRING,
STRING, STRING, STRING,
STRING, STRING, STRING, STRING, STRING, STRING,
STRING, STRING, STRING,
- STRING));
+ STRING, STRING));
assertTypeMergingVector(
VARBINARY,
Arrays.asList(
STRING, STRING, STRING, STRING, VARBINARY, STRING,
STRING, STRING, STRING,
STRING, STRING, STRING, STRING, STRING, STRING,
STRING, STRING, STRING,
- STRING));
+ STRING, STRING));
// 8-bit TINYINT could fit into FLOAT (24 sig bits) or DOUBLE (53 sig
bits)
assertTypeMergingVector(
@@ -1002,7 +1004,7 @@ class SchemaMergingUtilsTest {
Arrays.asList(
STRING, STRING, STRING, STRING, STRING, TINYINT,
SMALLINT, INT, BIGINT,
DECIMAL, FLOAT, DOUBLE, STRING, STRING, STRING,
STRING, STRING, STRING,
- STRING));
+ STRING, STRING));
// 16-bit SMALLINT could fit into FLOAT (24 sig bits) or DOUBLE (53
sig bits)
assertTypeMergingVector(
@@ -1010,42 +1012,43 @@ class SchemaMergingUtilsTest {
Arrays.asList(
STRING, STRING, STRING, STRING, STRING, SMALLINT,
SMALLINT, INT, BIGINT,
DECIMAL, FLOAT, DOUBLE, STRING, STRING, STRING,
STRING, STRING, STRING,
- STRING));
+ STRING, STRING));
// 32-bit INT could fit into DOUBLE (53 sig bits)
assertTypeMergingVector(
INT,
Arrays.asList(
STRING, STRING, STRING, STRING, STRING, INT, INT, INT,
BIGINT, DECIMAL,
- DOUBLE, DOUBLE, STRING, STRING, STRING, STRING,
STRING, STRING, STRING));
+ DOUBLE, DOUBLE, STRING, STRING, STRING, STRING,
STRING, STRING, STRING,
+ STRING));
assertTypeMergingVector(
BIGINT,
Arrays.asList(
STRING, STRING, STRING, STRING, STRING, BIGINT,
BIGINT, BIGINT, BIGINT,
DECIMAL, DOUBLE, DOUBLE, STRING, STRING, STRING,
STRING, STRING, STRING,
- STRING));
+ STRING, STRING));
assertTypeMergingVector(
DECIMAL,
Arrays.asList(
STRING, STRING, STRING, STRING, STRING, DECIMAL,
DECIMAL, DECIMAL, DECIMAL,
DECIMAL, STRING, STRING, STRING, STRING, STRING,
STRING, STRING, STRING,
- STRING));
+ STRING, STRING));
assertTypeMergingVector(
FLOAT,
Arrays.asList(
STRING, STRING, STRING, STRING, STRING, FLOAT, FLOAT,
DOUBLE, DOUBLE,
STRING, FLOAT, DOUBLE, STRING, STRING, STRING, STRING,
STRING, STRING,
- STRING));
+ STRING, STRING));
assertTypeMergingVector(
DOUBLE,
Arrays.asList(
STRING, STRING, STRING, STRING, STRING, DOUBLE,
DOUBLE, DOUBLE, DOUBLE,
STRING, DOUBLE, DOUBLE, STRING, STRING, STRING,
STRING, STRING, STRING,
- STRING));
+ STRING, STRING));
assertTypeMergingVector(
TIMESTAMP,
@@ -1068,6 +1071,7 @@ class SchemaMergingUtilsTest {
STRING,
STRING,
STRING,
+ STRING,
STRING));
assertTypeMergingVector(
@@ -1091,6 +1095,7 @@ class SchemaMergingUtilsTest {
STRING,
STRING,
STRING,
+ STRING,
STRING));
assertTypeMergingVector(
@@ -1114,6 +1119,7 @@ class SchemaMergingUtilsTest {
STRING,
STRING,
STRING,
+ STRING,
STRING));
assertTypeMergingVector(
@@ -1121,13 +1127,13 @@ class SchemaMergingUtilsTest {
Arrays.asList(
STRING, STRING, STRING, STRING, STRING, STRING,
STRING, STRING, STRING,
STRING, STRING, STRING, STRING, STRING, STRING, TIME,
STRING, STRING,
- STRING));
+ STRING, STRING));
assertTypeMergingVector(
ROW,
Arrays.asList(
STRING, STRING, STRING, STRING, STRING, STRING,
STRING, STRING, STRING,
- STRING, STRING, STRING, STRING, STRING, STRING,
STRING, ROW, STRING,
+ STRING, STRING, STRING, STRING, STRING, STRING,
STRING, ROW, STRING, STRING,
STRING));
assertTypeMergingVector(
@@ -1135,14 +1141,21 @@ class SchemaMergingUtilsTest {
Arrays.asList(
STRING, STRING, STRING, STRING, STRING, STRING,
STRING, STRING, STRING,
STRING, STRING, STRING, STRING, STRING, STRING,
STRING, STRING, ARRAY,
- STRING));
+ STRING, STRING));
assertTypeMergingVector(
MAP,
+ Arrays.asList(
+ STRING, STRING, STRING, STRING, STRING, STRING,
STRING, STRING, STRING,
+ STRING, STRING, STRING, STRING, STRING, STRING,
STRING, STRING, STRING, MAP,
+ STRING));
+
+ assertTypeMergingVector(
+ VARIANT,
Arrays.asList(
STRING, STRING, STRING, STRING, STRING, STRING,
STRING, STRING, STRING,
STRING, STRING, STRING, STRING, STRING, STRING,
STRING, STRING, STRING,
- MAP));
+ STRING, VARIANT));
}
private static void assertTypeMergingVector(DataType incomingType,
List<DataType> resultTypes) {