This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 91ae6776e [FLINK-39055][Iceberg] Support default column values in
Iceberg sink connector (#4277)
91ae6776e is described below
commit 91ae6776e97c7ae0f1d0f5a26813e9fdb46651e8
Author: suhwan <[email protected]>
AuthorDate: Tue Mar 10 15:46:45 2026 +0900
[FLINK-39055][Iceberg] Support default column values in Iceberg sink
connector (#4277)
---
.../iceberg/sink/IcebergMetadataApplier.java | 56 +++-
.../iceberg/sink/utils/IcebergTypeUtils.java | 67 +++++
.../iceberg/sink/IcebergMetadataApplierTest.java | 290 +++++++++++++++++++++
.../iceberg/sink/utils/IcebergTypeUtilsTest.java | 139 ++++++++++
4 files changed, 546 insertions(+), 6 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java
index 24d01ffd0..1707e8c6a 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java
@@ -38,6 +38,7 @@ import
org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
@@ -46,6 +47,7 @@ import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
@@ -178,7 +180,12 @@ public class IcebergMetadataApplier implements
MetadataApplier {
}
PartitionSpec partitionSpec = generatePartitionSpec(icebergSchema,
partitionColumns);
if (!catalog.tableExists(tableIdentifier)) {
- catalog.createTable(tableIdentifier, icebergSchema,
partitionSpec, tableOptions);
+ Table table =
+ catalog.createTable(
+ tableIdentifier, icebergSchema, partitionSpec,
tableOptions);
+
+ applyDefaultValues(table, cdcSchema);
+
LOG.info(
"Spend {} ms to create iceberg table {}",
System.currentTimeMillis() - startTimestamp,
@@ -189,6 +196,28 @@ public class IcebergMetadataApplier implements
MetadataApplier {
}
}
+ private void applyDefaultValues(
+ Table table, org.apache.flink.cdc.common.schema.Schema cdcSchema) {
+ if (getFormatVersion(table) < 3) {
+ return;
+ }
+ UpdateSchema updateSchema = null;
+ for (Column column : cdcSchema.getColumns()) {
+ Literal<?> defaultValue =
+ IcebergTypeUtils.parseDefaultValue(
+ column.getDefaultValueExpression(),
column.getType());
+ if (defaultValue != null) {
+ if (updateSchema == null) {
+ updateSchema = table.updateSchema();
+ }
+ updateSchema.updateColumnDefault(column.getName(),
defaultValue);
+ }
+ }
+ if (updateSchema != null) {
+ updateSchema.commit();
+ }
+ }
+
private void applyAddColumn(AddColumnEvent event) {
TableIdentifier tableIdentifier =
TableIdentifier.parse(event.tableId().identifier());
try {
@@ -212,16 +241,25 @@ public class IcebergMetadataApplier implements
MetadataApplier {
FlinkSchemaUtil.convert(
DataTypeUtils.toFlinkDataType(addColumn.getType())
.getLogicalType());
+ Literal<?> defaultValue =
+ IcebergTypeUtils.parseDefaultValue(
+ addColumn.getDefaultValueExpression(),
addColumn.getType());
+ if (defaultValue != null && getFormatVersion(table) >= 3) {
+ updateSchema.addColumn(columnName, icebergType,
columnComment, defaultValue);
+ updateSchema.updateColumnDefault(columnName, defaultValue);
+ } else {
+ updateSchema.addColumn(columnName, icebergType,
columnComment);
+ }
switch (columnWithPosition.getPosition()) {
case FIRST:
- updateSchema.addColumn(columnName, icebergType,
columnComment);
- table.updateSchema().moveFirst(columnName);
+ updateSchema.moveFirst(columnName);
break;
case LAST:
- updateSchema.addColumn(columnName, icebergType,
columnComment);
break;
case BEFORE:
- updateSchema.addColumn(columnName, icebergType,
columnComment);
+ checkNotNull(
+ columnWithPosition.getExistedColumnName(),
+ "Existing column name must be provided for
BEFORE position");
updateSchema.moveBefore(
columnName,
columnWithPosition.getExistedColumnName());
break;
@@ -229,7 +267,6 @@ public class IcebergMetadataApplier implements
MetadataApplier {
checkNotNull(
columnWithPosition.getExistedColumnName(),
"Existing column name must be provided for
AFTER position");
- updateSchema.addColumn(columnName, icebergType,
columnComment);
updateSchema.moveAfter(
columnName,
columnWithPosition.getExistedColumnName());
break;
@@ -364,6 +401,13 @@ public class IcebergMetadataApplier implements
MetadataApplier {
SchemaChangeEventType.ALTER_COLUMN_TYPE);
}
+ private int getFormatVersion(Table table) {
+ if (table instanceof HasTableOperations) {
+ return ((HasTableOperations)
table).operations().current().formatVersion();
+ }
+ return 2;
+ }
+
@Override
public void close() {
catalog = null;
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java
index 102b4dbad..83678daa6 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java
@@ -28,8 +28,13 @@ import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSink;
import org.apache.flink.table.data.TimestampData;
+import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
import java.time.ZoneId;
import java.util.ArrayList;
@@ -41,6 +46,8 @@ import static
org.apache.flink.cdc.common.types.DataTypeChecks.getPrecision;
/** Util class for types in {@link IcebergDataSink}. */
public class IcebergTypeUtils {
+ private static final Logger LOG =
LoggerFactory.getLogger(IcebergTypeUtils.class);
+
/** Convert column from Flink CDC to Iceberg format. */
public static Types.NestedField convertCdcColumnToIcebergField(
int index, PhysicalColumn column) {
@@ -53,6 +60,66 @@ public class IcebergTypeUtils {
column.getComment());
}
+ /**
+ * Parse a CDC default value expression string into an Iceberg {@link
Literal}.
+ *
+ * @return the parsed Literal, or null if the expression is null or cannot
be parsed for the
+ * given type.
+ */
+ @Nullable
+ public static Literal<?> parseDefaultValue(
+ @Nullable String defaultValueExpression, DataType cdcType) {
+ if (defaultValueExpression == null) {
+ return null;
+ }
+ try {
+ switch (cdcType.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ return Literal.of(defaultValueExpression);
+ case BOOLEAN:
+ if ("true".equalsIgnoreCase(defaultValueExpression)) {
+ return Literal.of(true);
+ } else if
("false".equalsIgnoreCase(defaultValueExpression)) {
+ return Literal.of(false);
+ } else {
+ LOG.warn(
+ "Invalid boolean default value '{}', skipping
default value.",
+ defaultValueExpression);
+ return null;
+ }
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ return
Literal.of(Integer.parseInt(defaultValueExpression));
+ case BIGINT:
+ return Literal.of(Long.parseLong(defaultValueExpression));
+ case FLOAT:
+ return
Literal.of(Float.parseFloat(defaultValueExpression));
+ case DOUBLE:
+ return
Literal.of(Double.parseDouble(defaultValueExpression));
+ case DECIMAL:
+ int scale = DataTypes.getScale(cdcType).orElse(0);
+ return Literal.of(
+ new java.math.BigDecimal(defaultValueExpression)
+ .setScale(scale,
java.math.RoundingMode.HALF_UP));
+ default:
+ LOG.debug(
+ "Unsupported default value type {} for expression
'{}', skipping default value.",
+ cdcType.getTypeRoot(),
+ defaultValueExpression);
+ return null;
+ }
+ } catch (NumberFormatException e) {
+ LOG.warn(
+ "Failed to parse default value '{}' for type {}, skipping
default value.",
+ defaultValueExpression,
+ cdcType.getTypeRoot(),
+ e);
+ return null;
+ }
+ }
+
/**
* Convert data type from Flink CDC to Iceberg format, refer to <a
*
href="https://iceberg.apache.org/docs/nightly/flink/#flink-to-iceberg">...</a>.
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java
index 6a491e219..726a303b4 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java
@@ -35,12 +35,14 @@ import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.testcontainers.lifecycle.Startables;
import java.io.File;
+import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -434,4 +436,292 @@ public class IcebergMetadataApplierTest {
new HashSet<>(Collections.singletonList(1)));
assertThat(table.schema().sameSchema(schema)).isTrue();
}
+
+ @Test
+ public void testApplySchemaChangeWithDefaultValuesFormatV3() {
+ Map<String, String> catalogOptions = new HashMap<>();
+ String warehouse =
+ new File(temporaryFolder.toFile(),
UUID.randomUUID().toString()).toString();
+ catalogOptions.put("type", "hadoop");
+ catalogOptions.put("warehouse", warehouse);
+ catalogOptions.put("cache-enabled", "false");
+
+ Map<String, String> tableOptions = new HashMap<>();
+ tableOptions.put("format-version", "3");
+
+ Catalog catalog =
+ CatalogUtil.buildIcebergCatalog(
+ "cdc-iceberg-catalog", catalogOptions, new
Configuration());
+
+ IcebergMetadataApplier icebergMetadataApplier =
+ new IcebergMetadataApplier(catalogOptions, tableOptions, new
HashMap<>());
+ String defaultTableId = "test.iceberg_table";
+ TableId tableId = TableId.parse(defaultTableId);
+
+ // Create Table with default values.
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(
+ tableId,
+ Schema.newBuilder()
+ .physicalColumn(
+ "id",
+ DataTypes.BIGINT().notNull(),
+ "column for id",
+ "AUTO_DECREMENT()")
+ .physicalColumn(
+ "name",
+ DataTypes.VARCHAR(255).notNull(),
+ "column for name",
+ "John Smith")
+ .physicalColumn(
+ "tinyIntCol",
+ DataTypes.TINYINT(),
+ "column for tinyIntCol",
+ "1")
+ .physicalColumn(
+ "description",
+ DataTypes.STRING(),
+ "column for descriptions",
+ "not important")
+ .physicalColumn(
+ "bool_column",
+ DataTypes.BOOLEAN(),
+ "column for bool",
+ "false")
+ .physicalColumn(
+ "float_column",
+ DataTypes.FLOAT(),
+ "column for float",
+ "1.0")
+ .physicalColumn(
+ "double_column",
+ DataTypes.DOUBLE(),
+ "column for double",
+ "1.0")
+ .physicalColumn(
+ "decimal_column",
+ DataTypes.DECIMAL(10, 2),
+ "column for decimal",
+ "1.0")
+ .primaryKey("id")
+ .partitionKey("id", "name")
+ .build());
+ icebergMetadataApplier.applySchemaChange(createTableEvent);
+ Table table = catalog.loadTable(TableIdentifier.parse(defaultTableId));
+
+ // Verify default values are applied on format v3.
+ org.apache.iceberg.Schema schemaWithDefaults =
+ new org.apache.iceberg.Schema(
+ 0,
+ Arrays.asList(
+ Types.NestedField.builder()
+ .withId(1)
+ .asRequired()
+ .withName("id")
+ .ofType(Types.LongType.get())
+ .withDoc("column for id")
+ .build(),
+ Types.NestedField.builder()
+ .withId(2)
+ .asRequired()
+ .withName("name")
+ .ofType(Types.StringType.get())
+ .withDoc("column for name")
+ .withWriteDefault(Literal.of("John
Smith"))
+ .build(),
+ Types.NestedField.builder()
+ .withId(3)
+ .asOptional()
+ .withName("tinyIntCol")
+ .ofType(Types.IntegerType.get())
+ .withDoc("column for tinyIntCol")
+ .withWriteDefault(Literal.of(1))
+ .build(),
+ Types.NestedField.builder()
+ .withId(4)
+ .asOptional()
+ .withName("description")
+ .ofType(Types.StringType.get())
+ .withDoc("column for descriptions")
+ .withWriteDefault(Literal.of("not
important"))
+ .build(),
+ Types.NestedField.builder()
+ .withId(5)
+ .asOptional()
+ .withName("bool_column")
+ .ofType(Types.BooleanType.get())
+ .withDoc("column for bool")
+ .withWriteDefault(Literal.of(false))
+ .build(),
+ Types.NestedField.builder()
+ .withId(6)
+ .asOptional()
+ .withName("float_column")
+ .ofType(Types.FloatType.get())
+ .withDoc("column for float")
+ .withWriteDefault(Literal.of(1.0f))
+ .build(),
+ Types.NestedField.builder()
+ .withId(7)
+ .asOptional()
+ .withName("double_column")
+ .ofType(Types.DoubleType.get())
+ .withDoc("column for double")
+ .withWriteDefault(Literal.of(1.0d))
+ .build(),
+ Types.NestedField.builder()
+ .withId(8)
+ .asOptional()
+ .withName("decimal_column")
+ .ofType(Types.DecimalType.of(10, 2))
+ .withDoc("column for decimal")
+ .withWriteDefault(Literal.of(new
BigDecimal("1.00")))
+ .build()),
+ new HashSet<>(Collections.singletonList(1)));
+ assertThat(table.schema().sameSchema(schemaWithDefaults)).isTrue();
+
+ // Verify that schema without default values is NOT the same.
+ org.apache.iceberg.Schema schemaWithoutDefaults =
+ new org.apache.iceberg.Schema(
+ 0,
+ Arrays.asList(
+ Types.NestedField.of(
+ 1, false, "id", Types.LongType.get(),
"column for id"),
+ Types.NestedField.of(
+ 2,
+ false,
+ "name",
+ Types.StringType.get(),
+ "column for name"),
+ Types.NestedField.of(
+ 3,
+ true,
+ "tinyIntCol",
+ Types.IntegerType.get(),
+ "column for tinyIntCol"),
+ Types.NestedField.of(
+ 4,
+ true,
+ "description",
+ Types.StringType.get(),
+ "column for descriptions"),
+ Types.NestedField.of(
+ 5,
+ true,
+ "bool_column",
+ Types.BooleanType.get(),
+ "column for bool"),
+ Types.NestedField.of(
+ 6,
+ true,
+ "float_column",
+ Types.FloatType.get(),
+ "column for float"),
+ Types.NestedField.of(
+ 7,
+ true,
+ "double_column",
+ Types.DoubleType.get(),
+ "column for double"),
+ Types.NestedField.of(
+ 8,
+ true,
+ "decimal_column",
+ Types.DecimalType.of(10, 2),
+ "column for decimal")),
+ new HashSet<>(Collections.singletonList(1)));
+ assertThat(table.schema().sameSchema(schemaWithoutDefaults)).isFalse();
+
+ // Add column with default value on format v3.
+ AddColumnEvent addColumnEvent =
+ new AddColumnEvent(
+ tableId,
+ Collections.singletonList(
+ AddColumnEvent.last(
+ new PhysicalColumn(
+ "newIntColumn",
+ DataTypes.INT(),
+ "comment for newIntColumn",
+ "42"))));
+ icebergMetadataApplier.applySchemaChange(addColumnEvent);
+ table = catalog.loadTable(TableIdentifier.parse(defaultTableId));
+ org.apache.iceberg.Schema schemaAfterAddColumn =
+ new org.apache.iceberg.Schema(
+ 0,
+ Arrays.asList(
+ Types.NestedField.builder()
+ .withId(1)
+ .asRequired()
+ .withName("id")
+ .ofType(Types.LongType.get())
+ .withDoc("column for id")
+ .build(),
+ Types.NestedField.builder()
+ .withId(2)
+ .asRequired()
+ .withName("name")
+ .ofType(Types.StringType.get())
+ .withDoc("column for name")
+ .withWriteDefault(Literal.of("John
Smith"))
+ .build(),
+ Types.NestedField.builder()
+ .withId(3)
+ .asOptional()
+ .withName("tinyIntCol")
+ .ofType(Types.IntegerType.get())
+ .withDoc("column for tinyIntCol")
+ .withWriteDefault(Literal.of(1))
+ .build(),
+ Types.NestedField.builder()
+ .withId(4)
+ .asOptional()
+ .withName("description")
+ .ofType(Types.StringType.get())
+ .withDoc("column for descriptions")
+ .withWriteDefault(Literal.of("not
important"))
+ .build(),
+ Types.NestedField.builder()
+ .withId(5)
+ .asOptional()
+ .withName("bool_column")
+ .ofType(Types.BooleanType.get())
+ .withDoc("column for bool")
+ .withWriteDefault(Literal.of(false))
+ .build(),
+ Types.NestedField.builder()
+ .withId(6)
+ .asOptional()
+ .withName("float_column")
+ .ofType(Types.FloatType.get())
+ .withDoc("column for float")
+ .withWriteDefault(Literal.of(1.0f))
+ .build(),
+ Types.NestedField.builder()
+ .withId(7)
+ .asOptional()
+ .withName("double_column")
+ .ofType(Types.DoubleType.get())
+ .withDoc("column for double")
+ .withWriteDefault(Literal.of(1.0d))
+ .build(),
+ Types.NestedField.builder()
+ .withId(8)
+ .asOptional()
+ .withName("decimal_column")
+ .ofType(Types.DecimalType.of(10, 2))
+ .withDoc("column for decimal")
+ .withWriteDefault(Literal.of(new
BigDecimal("1.00")))
+ .build(),
+ Types.NestedField.builder()
+ .withId(9)
+ .asOptional()
+ .withName("newIntColumn")
+ .ofType(Types.IntegerType.get())
+ .withDoc("comment for newIntColumn")
+ .withInitialDefault(Literal.of(42))
+ .withWriteDefault(Literal.of(42))
+ .build()),
+ new HashSet<>(Collections.singletonList(1)));
+ assertThat(table.schema().sameSchema(schemaAfterAddColumn)).isTrue();
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtilsTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtilsTest.java
new file mode 100644
index 000000000..faafcc5a5
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtilsTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.iceberg.sink.utils;
+
+import org.apache.flink.cdc.common.types.DataTypes;
+
+import org.apache.iceberg.expressions.Literal;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link IcebergTypeUtils}. */
+public class IcebergTypeUtilsTest {
+
+ @Test
+ public void testParseDefaultValueNull() {
+ assertThat(IcebergTypeUtils.parseDefaultValue(null,
DataTypes.STRING())).isNull();
+ }
+
+ @Test
+ public void testParseDefaultValueString() {
+ Literal<?> result = IcebergTypeUtils.parseDefaultValue("hello",
DataTypes.STRING());
+ assertThat(result).isNotNull();
+ assertThat(result.value().toString()).isEqualTo("hello");
+ }
+
+ @Test
+ public void testParseDefaultValueVarchar() {
+ Literal<?> result =
+ IcebergTypeUtils.parseDefaultValue("John Smith",
DataTypes.VARCHAR(255));
+ assertThat(result).isNotNull();
+ assertThat(result.value().toString()).isEqualTo("John Smith");
+ }
+
+ @Test
+ public void testParseDefaultValueBoolean() {
+ Literal<?> result = IcebergTypeUtils.parseDefaultValue("true",
DataTypes.BOOLEAN());
+ assertThat(result).isNotNull();
+ assertThat(result.value()).isEqualTo(true);
+
+ result = IcebergTypeUtils.parseDefaultValue("false",
DataTypes.BOOLEAN());
+ assertThat(result).isNotNull();
+ assertThat(result.value()).isEqualTo(false);
+ }
+
+ @Test
+ public void testParseDefaultValueBooleanInvalid() {
+ // Non-canonical boolean values should return null
+ assertThat(IcebergTypeUtils.parseDefaultValue("1",
DataTypes.BOOLEAN())).isNull();
+ assertThat(IcebergTypeUtils.parseDefaultValue("yes",
DataTypes.BOOLEAN())).isNull();
+ assertThat(IcebergTypeUtils.parseDefaultValue("garbage",
DataTypes.BOOLEAN())).isNull();
+ }
+
+ @Test
+ public void testParseDefaultValueInteger() {
+ Literal<?> result = IcebergTypeUtils.parseDefaultValue("42",
DataTypes.INT());
+ assertThat(result).isNotNull();
+ assertThat(result.value()).isEqualTo(42);
+ }
+
+ @Test
+ public void testParseDefaultValueTinyInt() {
+ Literal<?> result = IcebergTypeUtils.parseDefaultValue("1",
DataTypes.TINYINT());
+ assertThat(result).isNotNull();
+ assertThat(result.value()).isEqualTo(1);
+ }
+
+ @Test
+ public void testParseDefaultValueSmallInt() {
+ Literal<?> result = IcebergTypeUtils.parseDefaultValue("100",
DataTypes.SMALLINT());
+ assertThat(result).isNotNull();
+ assertThat(result.value()).isEqualTo(100);
+ }
+
+ @Test
+ public void testParseDefaultValueBigInt() {
+ Literal<?> result = IcebergTypeUtils.parseDefaultValue("9999999999",
DataTypes.BIGINT());
+ assertThat(result).isNotNull();
+ assertThat(result.value()).isEqualTo(9999999999L);
+ }
+
+ @Test
+ public void testParseDefaultValueFloat() {
+ Literal<?> result = IcebergTypeUtils.parseDefaultValue("1.5",
DataTypes.FLOAT());
+ assertThat(result).isNotNull();
+ assertThat(result.value()).isEqualTo(1.5f);
+ }
+
+ @Test
+ public void testParseDefaultValueDouble() {
+ Literal<?> result = IcebergTypeUtils.parseDefaultValue("3.14",
DataTypes.DOUBLE());
+ assertThat(result).isNotNull();
+ assertThat(result.value()).isEqualTo(3.14);
+ }
+
+ @Test
+ public void testParseDefaultValueDecimal() {
+ Literal<?> result = IcebergTypeUtils.parseDefaultValue("1.23",
DataTypes.DECIMAL(10, 2));
+ assertThat(result).isNotNull();
+ assertThat(result.value()).isEqualTo(new BigDecimal("1.23"));
+ }
+
+ @Test
+ public void testParseDefaultValueUnsupportedType() {
+ Literal<?> result = IcebergTypeUtils.parseDefaultValue("data",
DataTypes.BYTES());
+ assertThat(result).isNull();
+ }
+
+ @Test
+ public void testParseDefaultValueInvalidNumber() {
+ Literal<?> result = IcebergTypeUtils.parseDefaultValue("not_a_number",
DataTypes.INT());
+ assertThat(result).isNull();
+ }
+
+ @Test
+ public void testParseDefaultValueFunctionExpression() {
+ // Function expressions like AUTO_DECREMENT() cannot be parsed as
numbers
+ Literal<?> result =
+ IcebergTypeUtils.parseDefaultValue("AUTO_DECREMENT()",
DataTypes.BIGINT());
+ assertThat(result).isNull();
+ }
+}