This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 7d9e1c627 [FLINK-38879][pipeline-connector][paimon] Add support for
creating and writing table with variant type. (#4228)
7d9e1c627 is described below
commit 7d9e1c627a1e9c85642bba6e8f6fd2d3b2473aa2
Author: Kunni <[email protected]>
AuthorDate: Wed Jan 21 09:49:31 2026 +0800
[FLINK-38879][pipeline-connector][paimon] Add support for creating and
writing table with variant type. (#4228)
---
.../paimon/sink/PaimonMetadataApplier.java | 7 +--
.../paimon/sink/SchemaChangeProvider.java | 18 ++-----
.../connectors/paimon/sink/utils/TypeUtils.java | 63 ++++++++++++++++++++++
.../paimon/sink/v2/PaimonWriterHelper.java | 26 ++++++---
.../paimon/sink/PaimonHashFunctionTest.java | 17 ++++--
.../paimon/sink/PaimonMetadataApplierTest.java | 26 ++++++++-
.../paimon/sink/v2/PaimonWriterHelperTest.java | 16 ++++--
7 files changed, 138 insertions(+), 35 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
index 51e0fb0ed..93f12ef00 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
@@ -32,14 +32,13 @@ import
org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
import
org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
-import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
+import org.apache.flink.cdc.connectors.paimon.sink.utils.TypeUtils;
import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
-import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.Table;
@@ -173,9 +172,7 @@ public class PaimonMetadataApplier implements
MetadataApplier {
(column) ->
builder.column(
column.getName(),
- LogicalTypeConversion.toDataType(
-
DataTypeUtils.toFlinkDataType(column.getType())
- .getLogicalType()),
+
TypeUtils.toPaimonDataType(column.getType()),
column.getComment()));
List<String> partitionKeys = new ArrayList<>();
List<String> primaryKeys = schema.primaryKeys();
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java
index f6e0df907..22ee7ade1 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java
@@ -23,9 +23,8 @@ import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
import org.apache.flink.cdc.common.types.TimestampType;
import org.apache.flink.cdc.common.types.ZonedTimestampType;
-import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
+import org.apache.flink.cdc.connectors.paimon.sink.utils.TypeUtils;
-import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.schema.SchemaChange;
import java.util.ArrayList;
@@ -54,10 +53,7 @@ public class SchemaChangeProvider {
result.add(
SchemaChange.addColumn(
columnWithPosition.getAddColumn().getName(),
- LogicalTypeConversion.toDataType(
- DataTypeUtils.toFlinkDataType(
-
columnWithPosition.getAddColumn().getType())
- .getLogicalType()),
+
TypeUtils.toPaimonDataType(columnWithPosition.getAddColumn().getType()),
columnWithPosition.getAddColumn().getComment()));
// if default value express exists, we need to set the default value
to the table
// option
@@ -89,10 +85,7 @@ public class SchemaChangeProvider {
result.add(
SchemaChange.addColumn(
columnWithPosition.getAddColumn().getName(),
- LogicalTypeConversion.toDataType(
- DataTypeUtils.toFlinkDataType(
-
columnWithPosition.getAddColumn().getType())
- .getLogicalType()),
+
TypeUtils.toPaimonDataType(columnWithPosition.getAddColumn().getType()),
columnWithPosition.getAddColumn().getComment(),
move));
// if default value express exists, we need to set the default value
to the table
@@ -118,10 +111,7 @@ public class SchemaChangeProvider {
* @return A SchemaChange object representing the update of the column's
data type.
*/
public static SchemaChange updateColumnType(String oldColumnName, DataType
newType) {
- return SchemaChange.updateColumnType(
- oldColumnName,
- LogicalTypeConversion.toDataType(
-
DataTypeUtils.toFlinkDataType(newType).getLogicalType()));
+ return SchemaChange.updateColumnType(oldColumnName,
TypeUtils.toPaimonDataType(newType));
}
/**
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/utils/TypeUtils.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/utils/TypeUtils.java
new file mode 100644
index 000000000..729769c0d
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/utils/TypeUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.utils;
+
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypeRoot;
+import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.paimon.flink.LogicalTypeConversion;
+import org.apache.paimon.types.VariantType;
+
+/** Utils for data type conversion between CDC and Paimon System. */
+public class TypeUtils {
+
+ /**
+ * Convert Flink CDC DataType to Paimon DataType.
+ *
+ * @param dataType Flink CDC DataType
+ * @return Paimon DataType
+ */
+ public static org.apache.paimon.types.DataType toPaimonDataType(DataType
dataType) {
+ // TODO remove this branch after bumping Flink version to 2.2
+ if (dataType.is(DataTypeRoot.VARIANT)) {
+ return new VariantType(dataType.isNullable());
+ } else {
+ return LogicalTypeConversion.toDataType(
+ DataTypeUtils.toFlinkDataType(dataType).getLogicalType());
+ }
+ }
+
+ /**
+ * Convert Paimon DataType to Flink CDC DataType.
+ *
+ * @param dataType Paimon DataType
+ * @return Flink CDC DataType
+ */
+ public static DataType toCDCDataType(org.apache.paimon.types.DataType
dataType) {
+ // TODO remove this branch after bumping Flink version to 2.2
+ if (dataType.is(org.apache.paimon.types.DataTypeRoot.VARIANT)) {
+ return new
org.apache.flink.cdc.common.types.VariantType(dataType.isNullable());
+ } else {
+ return DataTypeUtils.fromFlinkDataType(
+ TypeConversions.fromLogicalToDataType(
+ LogicalTypeConversion.toLogicalType(dataType)));
+ }
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java
index 02bea1b16..1e34c689b 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java
@@ -31,10 +31,11 @@ import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypeChecks;
import org.apache.flink.cdc.common.types.DataTypeRoot;
-import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
+import org.apache.flink.cdc.common.types.variant.BinaryVariant;
+import org.apache.flink.cdc.common.utils.Preconditions;
+import org.apache.flink.cdc.connectors.paimon.sink.utils.TypeUtils;
import
org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketAssignOperator;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
@@ -43,7 +44,7 @@ import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
-import org.apache.paimon.flink.LogicalTypeConversion;
+import org.apache.paimon.data.variant.GenericVariant;
import org.apache.paimon.memory.MemorySegmentUtils;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.RowKind;
@@ -176,6 +177,20 @@ public class PaimonWriterHelper {
case MAP:
fieldGetter = new BinaryFieldDataGetter(fieldPos,
fieldType.getTypeRoot());
break;
+ case VARIANT:
+ fieldGetter =
+ row -> {
+ org.apache.flink.cdc.common.types.variant.Variant
variant =
+ row.getVariant(fieldPos);
+ Preconditions.checkArgument(
+ variant instanceof BinaryVariant,
+ "Unsupported variant type: %s",
+ variant.getClass());
+ return new GenericVariant(
+ ((BinaryVariant) variant).getValue(),
+ ((BinaryVariant) variant).getMetadata());
+ };
+ break;
default:
throw new IllegalArgumentException(
"don't support type of " + fieldType.getTypeRoot());
@@ -278,10 +293,7 @@ public class PaimonWriterHelper {
column ->
Column.physicalColumn(
column.name(),
-
DataTypeUtils.fromFlinkDataType(
-
TypeConversions.fromLogicalToDataType(
-
LogicalTypeConversion.toLogicalType(
-
column.type()))),
+
TypeUtils.toCDCDataType(column.type()),
column.description()))
.collect(Collectors.toList()));
builder.primaryKey(table.primaryKeys());
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java
index 1ca73444d..d600149d3 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.variant.BinaryVariantInternalBuilder;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.paimon.catalog.Catalog;
@@ -36,6 +37,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
+import java.io.IOException;
import java.nio.file.Path;
import java.time.ZoneId;
import java.util.HashMap;
@@ -72,7 +74,7 @@ class PaimonHashFunctionTest {
}
@Test
- public void testHashCodeForAppendOnlyTable() {
+ public void testHashCodeForAppendOnlyTable() throws IOException {
TableId tableId = TableId.tableId(TEST_DATABASE, "test_table");
Map<String, String> tableOptions = new HashMap<>();
MetadataApplier metadataApplier =
@@ -82,6 +84,7 @@ class PaimonHashFunctionTest {
.physicalColumn("col1", DataTypes.STRING().notNull())
.physicalColumn("col2", DataTypes.STRING())
.physicalColumn("pt", DataTypes.STRING())
+ .physicalColumn("variantCol", DataTypes.VARIANT())
.build();
CreateTableEvent createTableEvent = new CreateTableEvent(tableId,
schema);
metadataApplier.applySchemaChange(createTableEvent);
@@ -96,7 +99,9 @@ class PaimonHashFunctionTest {
new Object[] {
BinaryStringData.fromString("1"),
BinaryStringData.fromString("1"),
- BinaryStringData.fromString("2024")
+ BinaryStringData.fromString("2024"),
+ BinaryVariantInternalBuilder.parseJson(
+
"{\"a\":1,\"b\":\"hello\",\"c\":3.1}", false)
}));
int key1 = hashFunction.hashcode(dataChangeEvent1);
@@ -107,7 +112,9 @@ class PaimonHashFunctionTest {
new Object[] {
BinaryStringData.fromString("2"),
BinaryStringData.fromString("1"),
- BinaryStringData.fromString("2024")
+ BinaryStringData.fromString("2024"),
+ BinaryVariantInternalBuilder.parseJson(
+
"{\"a\":1,\"b\":\"hello\",\"c\":3.1}", false)
}));
int key2 = hashFunction.hashcode(dataChangeEvent2);
@@ -118,7 +125,9 @@ class PaimonHashFunctionTest {
new Object[] {
BinaryStringData.fromString("3"),
BinaryStringData.fromString("1"),
- BinaryStringData.fromString("2024")
+ BinaryStringData.fromString("2024"),
+ BinaryVariantInternalBuilder.parseJson(
+
"{\"a\":1,\"b\":\"hello\",\"c\":3.1}", false)
}));
int key3 = hashFunction.hashcode(dataChangeEvent3);
assertThat(key1).isBetween(0, 3);
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
index 0ddcacd8d..7f0ed436d 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
@@ -189,6 +189,26 @@ class PaimonMetadataApplierTest {
Assertions.assertThat(catalog.getTable(Identifier.fromString("test.table1")).rowType())
.isEqualTo(tableSchema);
+ // Add column with variant type.
+ addedColumns = new ArrayList<>();
+ addedColumns.add(
+ new AddColumnEvent.ColumnWithPosition(
+ Column.physicalColumn(
+ "variantCol",
+
org.apache.flink.cdc.common.types.DataTypes.VARIANT(),
+ null)));
+ addColumnEvent = new AddColumnEvent(TableId.parse("test.table1"),
addedColumns);
+ metadataApplier.applySchemaChange(addColumnEvent);
+ tableSchema =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "col1",
DataTypes.STRING().notNull()),
+ new DataField(
+ 2, "newcol3", DataTypes.STRING(),
null, "col3DefValue"),
+ new DataField(3, "variantCol",
DataTypes.VARIANT(), null, null)));
+
Assertions.assertThat(catalog.getTable(Identifier.fromString("test.table1")).rowType())
+ .isEqualTo(tableSchema);
+
// Create table with partition column.
createTableEvent =
new CreateTableEvent(
@@ -412,6 +432,9 @@ class PaimonMetadataApplierTest {
"timestamp_ltz_with_precision",
org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP_LTZ(
3))
+ .physicalColumn(
+ "variant",
+
org.apache.flink.cdc.common.types.DataTypes.VARIANT())
.primaryKey("col1")
.build());
metadataApplier.applySchemaChange(createTableEvent);
@@ -445,7 +468,8 @@ class PaimonMetadataApplierTest {
new DataField(
20,
"timestamp_ltz_with_precision",
-
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))));
+
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)),
+ new DataField(21, "variant",
DataTypes.VARIANT())));
Assertions.assertThat(catalog.getTable(Identifier.fromString("test.table1")).rowType())
.isEqualTo(tableSchema);
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java
index c6356bf1c..b48761970 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.common.types.variant.BinaryVariantInternalBuilder;
import org.apache.flink.cdc.connectors.paimon.sink.PaimonMetadataApplier;
import org.apache.flink.cdc.runtime.serializer.data.MapDataSerializer;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
@@ -45,6 +46,7 @@ import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.NestedRow;
import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.variant.GenericVariant;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.Table;
@@ -54,6 +56,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
+import java.io.IOException;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.ZoneId;
@@ -68,7 +71,7 @@ class PaimonWriterHelperTest {
@TempDir public static java.nio.file.Path temporaryFolder;
@Test
- void testConvertEventToGenericRowOfAllDataTypes() {
+ void testConvertEventToGenericRowOfAllDataTypes() throws IOException {
RowType rowType =
RowType.of(
DataTypes.BOOLEAN(),
@@ -92,7 +95,8 @@ class PaimonWriterHelperTest {
DataTypes.TIMESTAMP(3),
DataTypes.TIMESTAMP_LTZ(),
DataTypes.TIMESTAMP_LTZ(3),
- DataTypes.STRING());
+ DataTypes.STRING(),
+ DataTypes.VARIANT());
Object[] testData =
new Object[] {
true,
@@ -117,7 +121,9 @@ class PaimonWriterHelperTest {
TimestampData.fromTimestamp(java.sql.Timestamp.valueOf("2023-01-01 00:00:00")),
LocalZonedTimestampData.fromInstant(Instant.parse("2023-01-01T00:00:00.000Z")),
LocalZonedTimestampData.fromInstant(Instant.parse("2023-01-01T00:00:00.000Z")),
- null
+ null,
+ BinaryVariantInternalBuilder.parseJson(
+ "{\"a\":1,\"b\":\"hello\",\"c\":3.1}", false)
};
BinaryRecordData recordData = new
BinaryRecordDataGenerator(rowType).generate(testData);
Schema schema = Schema.newBuilder().fromRowDataType(rowType).build();
@@ -154,7 +160,8 @@ class PaimonWriterHelperTest {
java.sql.Timestamp.valueOf("2023-01-01
00:00:00")),
Timestamp.fromInstant(Instant.parse("2023-01-01T00:00:00.000Z")),
Timestamp.fromInstant(Instant.parse("2023-01-01T00:00:00.000Z")),
- null));
+ null,
+
GenericVariant.fromJson("{\"a\":1,\"b\":\"hello\",\"c\":3.1}")));
}
@Test
@@ -368,6 +375,7 @@ class PaimonWriterHelperTest {
.physicalColumn("timestamp_with_precision",
DataTypes.TIMESTAMP(3))
.physicalColumn("timestamp_ltz",
DataTypes.TIMESTAMP_LTZ())
.physicalColumn("timestamp_ltz_with_precision",
DataTypes.TIMESTAMP_LTZ(3))
+ .physicalColumn("variant", DataTypes.VARIANT())
.primaryKey("col1")
.build();
CreateTableEvent createTableEvent =