This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d9e1c627 [FLINK-38879][pipeline-connector][paimon] Add support for 
creating and writing table with variant type. (#4228)
7d9e1c627 is described below

commit 7d9e1c627a1e9c85642bba6e8f6fd2d3b2473aa2
Author: Kunni <[email protected]>
AuthorDate: Wed Jan 21 09:49:31 2026 +0800

    [FLINK-38879][pipeline-connector][paimon] Add support for creating and 
writing table with variant type. (#4228)
---
 .../paimon/sink/PaimonMetadataApplier.java         |  7 +--
 .../paimon/sink/SchemaChangeProvider.java          | 18 ++-----
 .../connectors/paimon/sink/utils/TypeUtils.java    | 63 ++++++++++++++++++++++
 .../paimon/sink/v2/PaimonWriterHelper.java         | 26 ++++++---
 .../paimon/sink/PaimonHashFunctionTest.java        | 17 ++++--
 .../paimon/sink/PaimonMetadataApplierTest.java     | 26 ++++++++-
 .../paimon/sink/v2/PaimonWriterHelperTest.java     | 16 ++++--
 7 files changed, 138 insertions(+), 35 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
index 51e0fb0ed..93f12ef00 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
@@ -32,14 +32,13 @@ import 
org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
 import 
org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
-import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
+import org.apache.flink.cdc.connectors.paimon.sink.utils.TypeUtils;
 
 import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
 
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.FlinkCatalogFactory;
-import org.apache.paimon.flink.LogicalTypeConversion;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.Table;
@@ -173,9 +172,7 @@ public class PaimonMetadataApplier implements 
MetadataApplier {
                             (column) ->
                                     builder.column(
                                             column.getName(),
-                                            LogicalTypeConversion.toDataType(
-                                                    
DataTypeUtils.toFlinkDataType(column.getType())
-                                                            .getLogicalType()),
+                                            
TypeUtils.toPaimonDataType(column.getType()),
                                             column.getComment()));
             List<String> partitionKeys = new ArrayList<>();
             List<String> primaryKeys = schema.primaryKeys();
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java
index f6e0df907..22ee7ade1 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java
@@ -23,9 +23,8 @@ import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
 import org.apache.flink.cdc.common.types.TimestampType;
 import org.apache.flink.cdc.common.types.ZonedTimestampType;
-import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
+import org.apache.flink.cdc.connectors.paimon.sink.utils.TypeUtils;
 
-import org.apache.paimon.flink.LogicalTypeConversion;
 import org.apache.paimon.schema.SchemaChange;
 
 import java.util.ArrayList;
@@ -54,10 +53,7 @@ public class SchemaChangeProvider {
         result.add(
                 SchemaChange.addColumn(
                         columnWithPosition.getAddColumn().getName(),
-                        LogicalTypeConversion.toDataType(
-                                DataTypeUtils.toFlinkDataType(
-                                                
columnWithPosition.getAddColumn().getType())
-                                        .getLogicalType()),
+                        
TypeUtils.toPaimonDataType(columnWithPosition.getAddColumn().getType()),
                         columnWithPosition.getAddColumn().getComment()));
         // if default value express exists, we need to set the default value 
to the table
         // option
@@ -89,10 +85,7 @@ public class SchemaChangeProvider {
         result.add(
                 SchemaChange.addColumn(
                         columnWithPosition.getAddColumn().getName(),
-                        LogicalTypeConversion.toDataType(
-                                DataTypeUtils.toFlinkDataType(
-                                                
columnWithPosition.getAddColumn().getType())
-                                        .getLogicalType()),
+                        
TypeUtils.toPaimonDataType(columnWithPosition.getAddColumn().getType()),
                         columnWithPosition.getAddColumn().getComment(),
                         move));
         // if default value express exists, we need to set the default value 
to the table
@@ -118,10 +111,7 @@ public class SchemaChangeProvider {
      * @return A SchemaChange object representing the update of the column's 
data type.
      */
     public static SchemaChange updateColumnType(String oldColumnName, DataType 
newType) {
-        return SchemaChange.updateColumnType(
-                oldColumnName,
-                LogicalTypeConversion.toDataType(
-                        
DataTypeUtils.toFlinkDataType(newType).getLogicalType()));
+        return SchemaChange.updateColumnType(oldColumnName, 
TypeUtils.toPaimonDataType(newType));
     }
 
     /**
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/utils/TypeUtils.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/utils/TypeUtils.java
new file mode 100644
index 000000000..729769c0d
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/utils/TypeUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.utils;
+
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypeRoot;
+import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.paimon.flink.LogicalTypeConversion;
+import org.apache.paimon.types.VariantType;
+
+/** Utils for data type conversion between CDC and Paimon System. */
+public class TypeUtils {
+
+    /**
+     * Convert Flink CDC DataType to Paimon DataType.
+     *
+     * @param dataType Flink CDC DataType
+     * @return Paimon DataType
+     */
+    public static org.apache.paimon.types.DataType toPaimonDataType(DataType 
dataType) {
+        // TODO remove this branch after bumping Flink version to 2.2
+        if (dataType.is(DataTypeRoot.VARIANT)) {
+            return new VariantType(dataType.isNullable());
+        } else {
+            return LogicalTypeConversion.toDataType(
+                    DataTypeUtils.toFlinkDataType(dataType).getLogicalType());
+        }
+    }
+
+    /**
+     * Convert Paimon DataType to Flink CDC DataType.
+     *
+     * @param dataType Paimon DataType
+     * @return Flink CDC DataType
+     */
+    public static DataType toCDCDataType(org.apache.paimon.types.DataType 
dataType) {
+        // TODO remove this branch after bumping Flink version to 2.2
+        if (dataType.is(org.apache.paimon.types.DataTypeRoot.VARIANT)) {
+            return new 
org.apache.flink.cdc.common.types.VariantType(dataType.isNullable());
+        } else {
+            return DataTypeUtils.fromFlinkDataType(
+                    TypeConversions.fromLogicalToDataType(
+                            LogicalTypeConversion.toLogicalType(dataType)));
+        }
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java
index 02bea1b16..1e34c689b 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java
@@ -31,10 +31,11 @@ import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.DataTypeChecks;
 import org.apache.flink.cdc.common.types.DataTypeRoot;
-import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
+import org.apache.flink.cdc.common.types.variant.BinaryVariant;
+import org.apache.flink.cdc.common.utils.Preconditions;
+import org.apache.flink.cdc.connectors.paimon.sink.utils.TypeUtils;
 import 
org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketAssignOperator;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.table.types.utils.TypeConversions;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
@@ -43,7 +44,7 @@ import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.Timestamp;
-import org.apache.paimon.flink.LogicalTypeConversion;
+import org.apache.paimon.data.variant.GenericVariant;
 import org.apache.paimon.memory.MemorySegmentUtils;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.types.RowKind;
@@ -176,6 +177,20 @@ public class PaimonWriterHelper {
             case MAP:
                 fieldGetter = new BinaryFieldDataGetter(fieldPos, 
fieldType.getTypeRoot());
                 break;
+            case VARIANT:
+                fieldGetter =
+                        row -> {
+                            org.apache.flink.cdc.common.types.variant.Variant 
variant =
+                                    row.getVariant(fieldPos);
+                            Preconditions.checkArgument(
+                                    variant instanceof BinaryVariant,
+                                    "Unsupported variant type: %s",
+                                    variant.getClass());
+                            return new GenericVariant(
+                                    ((BinaryVariant) variant).getValue(),
+                                    ((BinaryVariant) variant).getMetadata());
+                        };
+                break;
             default:
                 throw new IllegalArgumentException(
                         "don't support type of " + fieldType.getTypeRoot());
@@ -278,10 +293,7 @@ public class PaimonWriterHelper {
                                 column ->
                                         Column.physicalColumn(
                                                 column.name(),
-                                                
DataTypeUtils.fromFlinkDataType(
-                                                        
TypeConversions.fromLogicalToDataType(
-                                                                
LogicalTypeConversion.toLogicalType(
-                                                                        
column.type()))),
+                                                
TypeUtils.toCDCDataType(column.type()),
                                                 column.description()))
                         .collect(Collectors.toList()));
         builder.primaryKey(table.primaryKeys());
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java
index 1ca73444d..d600149d3 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
 import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.variant.BinaryVariantInternalBuilder;
 import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
 
 import org.apache.paimon.catalog.Catalog;
@@ -36,6 +37,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
+import java.io.IOException;
 import java.nio.file.Path;
 import java.time.ZoneId;
 import java.util.HashMap;
@@ -72,7 +74,7 @@ class PaimonHashFunctionTest {
     }
 
     @Test
-    public void testHashCodeForAppendOnlyTable() {
+    public void testHashCodeForAppendOnlyTable() throws IOException {
         TableId tableId = TableId.tableId(TEST_DATABASE, "test_table");
         Map<String, String> tableOptions = new HashMap<>();
         MetadataApplier metadataApplier =
@@ -82,6 +84,7 @@ class PaimonHashFunctionTest {
                         .physicalColumn("col1", DataTypes.STRING().notNull())
                         .physicalColumn("col2", DataTypes.STRING())
                         .physicalColumn("pt", DataTypes.STRING())
+                        .physicalColumn("variantCol", DataTypes.VARIANT())
                         .build();
         CreateTableEvent createTableEvent = new CreateTableEvent(tableId, 
schema);
         metadataApplier.applySchemaChange(createTableEvent);
@@ -96,7 +99,9 @@ class PaimonHashFunctionTest {
                                 new Object[] {
                                     BinaryStringData.fromString("1"),
                                     BinaryStringData.fromString("1"),
-                                    BinaryStringData.fromString("2024")
+                                    BinaryStringData.fromString("2024"),
+                                    BinaryVariantInternalBuilder.parseJson(
+                                            
"{\"a\":1,\"b\":\"hello\",\"c\":3.1}", false)
                                 }));
         int key1 = hashFunction.hashcode(dataChangeEvent1);
 
@@ -107,7 +112,9 @@ class PaimonHashFunctionTest {
                                 new Object[] {
                                     BinaryStringData.fromString("2"),
                                     BinaryStringData.fromString("1"),
-                                    BinaryStringData.fromString("2024")
+                                    BinaryStringData.fromString("2024"),
+                                    BinaryVariantInternalBuilder.parseJson(
+                                            
"{\"a\":1,\"b\":\"hello\",\"c\":3.1}", false)
                                 }));
         int key2 = hashFunction.hashcode(dataChangeEvent2);
 
@@ -118,7 +125,9 @@ class PaimonHashFunctionTest {
                                 new Object[] {
                                     BinaryStringData.fromString("3"),
                                     BinaryStringData.fromString("1"),
-                                    BinaryStringData.fromString("2024")
+                                    BinaryStringData.fromString("2024"),
+                                    BinaryVariantInternalBuilder.parseJson(
+                                            
"{\"a\":1,\"b\":\"hello\",\"c\":3.1}", false)
                                 }));
         int key3 = hashFunction.hashcode(dataChangeEvent3);
         assertThat(key1).isBetween(0, 3);
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
index 0ddcacd8d..7f0ed436d 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
@@ -189,6 +189,26 @@ class PaimonMetadataApplierTest {
         
Assertions.assertThat(catalog.getTable(Identifier.fromString("test.table1")).rowType())
                 .isEqualTo(tableSchema);
 
+        // Add column with variant type.
+        addedColumns = new ArrayList<>();
+        addedColumns.add(
+                new AddColumnEvent.ColumnWithPosition(
+                        Column.physicalColumn(
+                                "variantCol",
+                                
org.apache.flink.cdc.common.types.DataTypes.VARIANT(),
+                                null)));
+        addColumnEvent = new AddColumnEvent(TableId.parse("test.table1"), 
addedColumns);
+        metadataApplier.applySchemaChange(addColumnEvent);
+        tableSchema =
+                new RowType(
+                        Arrays.asList(
+                                new DataField(0, "col1", 
DataTypes.STRING().notNull()),
+                                new DataField(
+                                        2, "newcol3", DataTypes.STRING(), 
null, "col3DefValue"),
+                                new DataField(3, "variantCol", 
DataTypes.VARIANT(), null, null)));
+        
Assertions.assertThat(catalog.getTable(Identifier.fromString("test.table1")).rowType())
+                .isEqualTo(tableSchema);
+
         // Create table with partition column.
         createTableEvent =
                 new CreateTableEvent(
@@ -412,6 +432,9 @@ class PaimonMetadataApplierTest {
                                         "timestamp_ltz_with_precision",
                                         
org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP_LTZ(
                                                 3))
+                                .physicalColumn(
+                                        "variant",
+                                        
org.apache.flink.cdc.common.types.DataTypes.VARIANT())
                                 .primaryKey("col1")
                                 .build());
         metadataApplier.applySchemaChange(createTableEvent);
@@ -445,7 +468,8 @@ class PaimonMetadataApplierTest {
                                 new DataField(
                                         20,
                                         "timestamp_ltz_with_precision",
-                                        
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))));
+                                        
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)),
+                                new DataField(21, "variant", 
DataTypes.VARIANT())));
         
Assertions.assertThat(catalog.getTable(Identifier.fromString("test.table1")).rowType())
                 .isEqualTo(tableSchema);
     }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java
index c6356bf1c..b48761970 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.types.DataTypes;
 import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.common.types.variant.BinaryVariantInternalBuilder;
 import org.apache.flink.cdc.connectors.paimon.sink.PaimonMetadataApplier;
 import org.apache.flink.cdc.runtime.serializer.data.MapDataSerializer;
 import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
@@ -45,6 +46,7 @@ import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalMap;
 import org.apache.paimon.data.NestedRow;
 import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.variant.GenericVariant;
 import org.apache.paimon.flink.FlinkCatalogFactory;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.Table;
@@ -54,6 +56,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
+import java.io.IOException;
 import java.math.BigDecimal;
 import java.time.Instant;
 import java.time.ZoneId;
@@ -68,7 +71,7 @@ class PaimonWriterHelperTest {
     @TempDir public static java.nio.file.Path temporaryFolder;
 
     @Test
-    void testConvertEventToGenericRowOfAllDataTypes() {
+    void testConvertEventToGenericRowOfAllDataTypes() throws IOException {
         RowType rowType =
                 RowType.of(
                         DataTypes.BOOLEAN(),
@@ -92,7 +95,8 @@ class PaimonWriterHelperTest {
                         DataTypes.TIMESTAMP(3),
                         DataTypes.TIMESTAMP_LTZ(),
                         DataTypes.TIMESTAMP_LTZ(3),
-                        DataTypes.STRING());
+                        DataTypes.STRING(),
+                        DataTypes.VARIANT());
         Object[] testData =
                 new Object[] {
                     true,
@@ -117,7 +121,9 @@ class PaimonWriterHelperTest {
                     
TimestampData.fromTimestamp(java.sql.Timestamp.valueOf("2023-01-01 00:00:00")),
                     
LocalZonedTimestampData.fromInstant(Instant.parse("2023-01-01T00:00:00.000Z")),
                     
LocalZonedTimestampData.fromInstant(Instant.parse("2023-01-01T00:00:00.000Z")),
-                    null
+                    null,
+                    BinaryVariantInternalBuilder.parseJson(
+                            "{\"a\":1,\"b\":\"hello\",\"c\":3.1}", false)
                 };
         BinaryRecordData recordData = new 
BinaryRecordDataGenerator(rowType).generate(testData);
         Schema schema = Schema.newBuilder().fromRowDataType(rowType).build();
@@ -154,7 +160,8 @@ class PaimonWriterHelperTest {
                                         java.sql.Timestamp.valueOf("2023-01-01 
00:00:00")),
                                 
Timestamp.fromInstant(Instant.parse("2023-01-01T00:00:00.000Z")),
                                 
Timestamp.fromInstant(Instant.parse("2023-01-01T00:00:00.000Z")),
-                                null));
+                                null,
+                                
GenericVariant.fromJson("{\"a\":1,\"b\":\"hello\",\"c\":3.1}")));
     }
 
     @Test
@@ -368,6 +375,7 @@ class PaimonWriterHelperTest {
                         .physicalColumn("timestamp_with_precision", 
DataTypes.TIMESTAMP(3))
                         .physicalColumn("timestamp_ltz", 
DataTypes.TIMESTAMP_LTZ())
                         .physicalColumn("timestamp_ltz_with_precision", 
DataTypes.TIMESTAMP_LTZ(3))
+                        .physicalColumn("variant", DataTypes.VARIANT())
                         .primaryKey("col1")
                         .build();
         CreateTableEvent createTableEvent =

Reply via email to