This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 91ae6776e [FLINK-39055][Iceberg] Support default column values in 
Iceberg sink connector (#4277)
91ae6776e is described below

commit 91ae6776e97c7ae0f1d0f5a26813e9fdb46651e8
Author: suhwan <[email protected]>
AuthorDate: Tue Mar 10 15:46:45 2026 +0900

    [FLINK-39055][Iceberg] Support default column values in Iceberg sink 
connector (#4277)
---
 .../iceberg/sink/IcebergMetadataApplier.java       |  56 +++-
 .../iceberg/sink/utils/IcebergTypeUtils.java       |  67 +++++
 .../iceberg/sink/IcebergMetadataApplierTest.java   | 290 +++++++++++++++++++++
 .../iceberg/sink/utils/IcebergTypeUtilsTest.java   | 139 ++++++++++
 4 files changed, 546 insertions(+), 6 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java
index 24d01ffd0..1707e8c6a 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.HasTableOperations;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
@@ -46,6 +47,7 @@ import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.SupportsNamespaces;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.expressions.Literal;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
@@ -178,7 +180,12 @@ public class IcebergMetadataApplier implements 
MetadataApplier {
             }
             PartitionSpec partitionSpec = generatePartitionSpec(icebergSchema, 
partitionColumns);
             if (!catalog.tableExists(tableIdentifier)) {
-                catalog.createTable(tableIdentifier, icebergSchema, 
partitionSpec, tableOptions);
+                Table table =
+                        catalog.createTable(
+                                tableIdentifier, icebergSchema, partitionSpec, 
tableOptions);
+
+                applyDefaultValues(table, cdcSchema);
+
                 LOG.info(
                         "Spend {} ms to create iceberg table {}",
                         System.currentTimeMillis() - startTimestamp,
@@ -189,6 +196,28 @@ public class IcebergMetadataApplier implements 
MetadataApplier {
         }
     }
 
+    private void applyDefaultValues(
+            Table table, org.apache.flink.cdc.common.schema.Schema cdcSchema) {
+        if (getFormatVersion(table) < 3) {
+            return;
+        }
+        UpdateSchema updateSchema = null;
+        for (Column column : cdcSchema.getColumns()) {
+            Literal<?> defaultValue =
+                    IcebergTypeUtils.parseDefaultValue(
+                            column.getDefaultValueExpression(), 
column.getType());
+            if (defaultValue != null) {
+                if (updateSchema == null) {
+                    updateSchema = table.updateSchema();
+                }
+                updateSchema.updateColumnDefault(column.getName(), 
defaultValue);
+            }
+        }
+        if (updateSchema != null) {
+            updateSchema.commit();
+        }
+    }
+
     private void applyAddColumn(AddColumnEvent event) {
         TableIdentifier tableIdentifier = 
TableIdentifier.parse(event.tableId().identifier());
         try {
@@ -212,16 +241,25 @@ public class IcebergMetadataApplier implements 
MetadataApplier {
                         FlinkSchemaUtil.convert(
                                 
DataTypeUtils.toFlinkDataType(addColumn.getType())
                                         .getLogicalType());
+                Literal<?> defaultValue =
+                        IcebergTypeUtils.parseDefaultValue(
+                                addColumn.getDefaultValueExpression(), 
addColumn.getType());
+                if (defaultValue != null && getFormatVersion(table) >= 3) {
+                    updateSchema.addColumn(columnName, icebergType, 
columnComment, defaultValue);
+                    updateSchema.updateColumnDefault(columnName, defaultValue);
+                } else {
+                    updateSchema.addColumn(columnName, icebergType, 
columnComment);
+                }
                 switch (columnWithPosition.getPosition()) {
                     case FIRST:
-                        updateSchema.addColumn(columnName, icebergType, 
columnComment);
-                        table.updateSchema().moveFirst(columnName);
+                        updateSchema.moveFirst(columnName);
                         break;
                     case LAST:
-                        updateSchema.addColumn(columnName, icebergType, 
columnComment);
                         break;
                     case BEFORE:
-                        updateSchema.addColumn(columnName, icebergType, 
columnComment);
+                        checkNotNull(
+                                columnWithPosition.getExistedColumnName(),
+                                "Existing column name must be provided for 
BEFORE position");
                         updateSchema.moveBefore(
                                 columnName, 
columnWithPosition.getExistedColumnName());
                         break;
@@ -229,7 +267,6 @@ public class IcebergMetadataApplier implements 
MetadataApplier {
                         checkNotNull(
                                 columnWithPosition.getExistedColumnName(),
                                 "Existing column name must be provided for 
AFTER position");
-                        updateSchema.addColumn(columnName, icebergType, 
columnComment);
                         updateSchema.moveAfter(
                                 columnName, 
columnWithPosition.getExistedColumnName());
                         break;
@@ -364,6 +401,13 @@ public class IcebergMetadataApplier implements 
MetadataApplier {
                 SchemaChangeEventType.ALTER_COLUMN_TYPE);
     }
 
+    private int getFormatVersion(Table table) {
+        if (table instanceof HasTableOperations) {
+            return ((HasTableOperations) 
table).operations().current().formatVersion();
+        }
+        return 2;
+    }
+
     @Override
     public void close() {
         catalog = null;
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java
index 102b4dbad..83678daa6 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java
@@ -28,8 +28,13 @@ import org.apache.flink.cdc.common.types.DataTypes;
 import org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSink;
 import org.apache.flink.table.data.TimestampData;
 
+import org.apache.iceberg.expressions.Literal;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
 
 import java.time.ZoneId;
 import java.util.ArrayList;
@@ -41,6 +46,8 @@ import static 
org.apache.flink.cdc.common.types.DataTypeChecks.getPrecision;
 /** Util class for types in {@link IcebergDataSink}. */
 public class IcebergTypeUtils {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(IcebergTypeUtils.class);
+
     /** Convert column from Flink CDC to Iceberg format. */
     public static Types.NestedField convertCdcColumnToIcebergField(
             int index, PhysicalColumn column) {
@@ -53,6 +60,66 @@ public class IcebergTypeUtils {
                 column.getComment());
     }
 
+    /**
+     * Parse a CDC default value expression string into an Iceberg {@link 
Literal}.
+     *
+     * @return the parsed Literal, or null if the expression is null or cannot 
be parsed for the
+     *     given type.
+     */
+    @Nullable
+    public static Literal<?> parseDefaultValue(
+            @Nullable String defaultValueExpression, DataType cdcType) {
+        if (defaultValueExpression == null) {
+            return null;
+        }
+        try {
+            switch (cdcType.getTypeRoot()) {
+                case CHAR:
+                case VARCHAR:
+                    return Literal.of(defaultValueExpression);
+                case BOOLEAN:
+                    if ("true".equalsIgnoreCase(defaultValueExpression)) {
+                        return Literal.of(true);
+                    } else if 
("false".equalsIgnoreCase(defaultValueExpression)) {
+                        return Literal.of(false);
+                    } else {
+                        LOG.warn(
+                                "Invalid boolean default value '{}', skipping 
default value.",
+                                defaultValueExpression);
+                        return null;
+                    }
+                case TINYINT:
+                case SMALLINT:
+                case INTEGER:
+                    return 
Literal.of(Integer.parseInt(defaultValueExpression));
+                case BIGINT:
+                    return Literal.of(Long.parseLong(defaultValueExpression));
+                case FLOAT:
+                    return 
Literal.of(Float.parseFloat(defaultValueExpression));
+                case DOUBLE:
+                    return 
Literal.of(Double.parseDouble(defaultValueExpression));
+                case DECIMAL:
+                    int scale = DataTypes.getScale(cdcType).orElse(0);
+                    return Literal.of(
+                            new java.math.BigDecimal(defaultValueExpression)
+                                    .setScale(scale, 
java.math.RoundingMode.HALF_UP));
+                default:
+                    LOG.debug(
+                            "Unsupported default value type {} for expression 
'{}', skipping default value.",
+                            cdcType.getTypeRoot(),
+                            defaultValueExpression);
+                    return null;
+            }
+        } catch (NumberFormatException e) {
+            LOG.warn(
+                    "Failed to parse default value '{}' for type {}, skipping 
default value.",
+                    defaultValueExpression,
+                    cdcType.getTypeRoot(),
+                    e);
+            return null;
+        }
+    }
+
     /**
      * Convert data type from Flink CDC to Iceberg format, refer to <a
      * 
href="https://iceberg.apache.org/docs/nightly/flink/#flink-to-iceberg";>...</a>.
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java
index 6a491e219..726a303b4 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java
@@ -35,12 +35,14 @@ import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.expressions.Literal;
 import org.apache.iceberg.types.Types;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.testcontainers.lifecycle.Startables;
 
 import java.io.File;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -434,4 +436,292 @@ public class IcebergMetadataApplierTest {
                         new HashSet<>(Collections.singletonList(1)));
         assertThat(table.schema().sameSchema(schema)).isTrue();
     }
+
+    @Test
+    public void testApplySchemaChangeWithDefaultValuesFormatV3() {
+        Map<String, String> catalogOptions = new HashMap<>();
+        String warehouse =
+                new File(temporaryFolder.toFile(), 
UUID.randomUUID().toString()).toString();
+        catalogOptions.put("type", "hadoop");
+        catalogOptions.put("warehouse", warehouse);
+        catalogOptions.put("cache-enabled", "false");
+
+        Map<String, String> tableOptions = new HashMap<>();
+        tableOptions.put("format-version", "3");
+
+        Catalog catalog =
+                CatalogUtil.buildIcebergCatalog(
+                        "cdc-iceberg-catalog", catalogOptions, new 
Configuration());
+
+        IcebergMetadataApplier icebergMetadataApplier =
+                new IcebergMetadataApplier(catalogOptions, tableOptions, new 
HashMap<>());
+        String defaultTableId = "test.iceberg_table";
+        TableId tableId = TableId.parse(defaultTableId);
+
+        // Create Table with default values.
+        CreateTableEvent createTableEvent =
+                new CreateTableEvent(
+                        tableId,
+                        Schema.newBuilder()
+                                .physicalColumn(
+                                        "id",
+                                        DataTypes.BIGINT().notNull(),
+                                        "column for id",
+                                        "AUTO_DECREMENT()")
+                                .physicalColumn(
+                                        "name",
+                                        DataTypes.VARCHAR(255).notNull(),
+                                        "column for name",
+                                        "John Smith")
+                                .physicalColumn(
+                                        "tinyIntCol",
+                                        DataTypes.TINYINT(),
+                                        "column for tinyIntCol",
+                                        "1")
+                                .physicalColumn(
+                                        "description",
+                                        DataTypes.STRING(),
+                                        "column for descriptions",
+                                        "not important")
+                                .physicalColumn(
+                                        "bool_column",
+                                        DataTypes.BOOLEAN(),
+                                        "column for bool",
+                                        "false")
+                                .physicalColumn(
+                                        "float_column",
+                                        DataTypes.FLOAT(),
+                                        "column for float",
+                                        "1.0")
+                                .physicalColumn(
+                                        "double_column",
+                                        DataTypes.DOUBLE(),
+                                        "column for double",
+                                        "1.0")
+                                .physicalColumn(
+                                        "decimal_column",
+                                        DataTypes.DECIMAL(10, 2),
+                                        "column for decimal",
+                                        "1.0")
+                                .primaryKey("id")
+                                .partitionKey("id", "name")
+                                .build());
+        icebergMetadataApplier.applySchemaChange(createTableEvent);
+        Table table = catalog.loadTable(TableIdentifier.parse(defaultTableId));
+
+        // Verify default values are applied on format v3.
+        org.apache.iceberg.Schema schemaWithDefaults =
+                new org.apache.iceberg.Schema(
+                        0,
+                        Arrays.asList(
+                                Types.NestedField.builder()
+                                        .withId(1)
+                                        .asRequired()
+                                        .withName("id")
+                                        .ofType(Types.LongType.get())
+                                        .withDoc("column for id")
+                                        .build(),
+                                Types.NestedField.builder()
+                                        .withId(2)
+                                        .asRequired()
+                                        .withName("name")
+                                        .ofType(Types.StringType.get())
+                                        .withDoc("column for name")
+                                        .withWriteDefault(Literal.of("John 
Smith"))
+                                        .build(),
+                                Types.NestedField.builder()
+                                        .withId(3)
+                                        .asOptional()
+                                        .withName("tinyIntCol")
+                                        .ofType(Types.IntegerType.get())
+                                        .withDoc("column for tinyIntCol")
+                                        .withWriteDefault(Literal.of(1))
+                                        .build(),
+                                Types.NestedField.builder()
+                                        .withId(4)
+                                        .asOptional()
+                                        .withName("description")
+                                        .ofType(Types.StringType.get())
+                                        .withDoc("column for descriptions")
+                                        .withWriteDefault(Literal.of("not 
important"))
+                                        .build(),
+                                Types.NestedField.builder()
+                                        .withId(5)
+                                        .asOptional()
+                                        .withName("bool_column")
+                                        .ofType(Types.BooleanType.get())
+                                        .withDoc("column for bool")
+                                        .withWriteDefault(Literal.of(false))
+                                        .build(),
+                                Types.NestedField.builder()
+                                        .withId(6)
+                                        .asOptional()
+                                        .withName("float_column")
+                                        .ofType(Types.FloatType.get())
+                                        .withDoc("column for float")
+                                        .withWriteDefault(Literal.of(1.0f))
+                                        .build(),
+                                Types.NestedField.builder()
+                                        .withId(7)
+                                        .asOptional()
+                                        .withName("double_column")
+                                        .ofType(Types.DoubleType.get())
+                                        .withDoc("column for double")
+                                        .withWriteDefault(Literal.of(1.0d))
+                                        .build(),
+                                Types.NestedField.builder()
+                                        .withId(8)
+                                        .asOptional()
+                                        .withName("decimal_column")
+                                        .ofType(Types.DecimalType.of(10, 2))
+                                        .withDoc("column for decimal")
+                                        .withWriteDefault(Literal.of(new 
BigDecimal("1.00")))
+                                        .build()),
+                        new HashSet<>(Collections.singletonList(1)));
+        assertThat(table.schema().sameSchema(schemaWithDefaults)).isTrue();
+
+        // Verify that schema without default values is NOT the same.
+        org.apache.iceberg.Schema schemaWithoutDefaults =
+                new org.apache.iceberg.Schema(
+                        0,
+                        Arrays.asList(
+                                Types.NestedField.of(
+                                        1, false, "id", Types.LongType.get(), 
"column for id"),
+                                Types.NestedField.of(
+                                        2,
+                                        false,
+                                        "name",
+                                        Types.StringType.get(),
+                                        "column for name"),
+                                Types.NestedField.of(
+                                        3,
+                                        true,
+                                        "tinyIntCol",
+                                        Types.IntegerType.get(),
+                                        "column for tinyIntCol"),
+                                Types.NestedField.of(
+                                        4,
+                                        true,
+                                        "description",
+                                        Types.StringType.get(),
+                                        "column for descriptions"),
+                                Types.NestedField.of(
+                                        5,
+                                        true,
+                                        "bool_column",
+                                        Types.BooleanType.get(),
+                                        "column for bool"),
+                                Types.NestedField.of(
+                                        6,
+                                        true,
+                                        "float_column",
+                                        Types.FloatType.get(),
+                                        "column for float"),
+                                Types.NestedField.of(
+                                        7,
+                                        true,
+                                        "double_column",
+                                        Types.DoubleType.get(),
+                                        "column for double"),
+                                Types.NestedField.of(
+                                        8,
+                                        true,
+                                        "decimal_column",
+                                        Types.DecimalType.of(10, 2),
+                                        "column for decimal")),
+                        new HashSet<>(Collections.singletonList(1)));
+        assertThat(table.schema().sameSchema(schemaWithoutDefaults)).isFalse();
+
+        // Add column with default value on format v3.
+        AddColumnEvent addColumnEvent =
+                new AddColumnEvent(
+                        tableId,
+                        Collections.singletonList(
+                                AddColumnEvent.last(
+                                        new PhysicalColumn(
+                                                "newIntColumn",
+                                                DataTypes.INT(),
+                                                "comment for newIntColumn",
+                                                "42"))));
+        icebergMetadataApplier.applySchemaChange(addColumnEvent);
+        table = catalog.loadTable(TableIdentifier.parse(defaultTableId));
+        org.apache.iceberg.Schema schemaAfterAddColumn =
+                new org.apache.iceberg.Schema(
+                        0,
+                        Arrays.asList(
+                                Types.NestedField.builder()
+                                        .withId(1)
+                                        .asRequired()
+                                        .withName("id")
+                                        .ofType(Types.LongType.get())
+                                        .withDoc("column for id")
+                                        .build(),
+                                Types.NestedField.builder()
+                                        .withId(2)
+                                        .asRequired()
+                                        .withName("name")
+                                        .ofType(Types.StringType.get())
+                                        .withDoc("column for name")
+                                        .withWriteDefault(Literal.of("John 
Smith"))
+                                        .build(),
+                                Types.NestedField.builder()
+                                        .withId(3)
+                                        .asOptional()
+                                        .withName("tinyIntCol")
+                                        .ofType(Types.IntegerType.get())
+                                        .withDoc("column for tinyIntCol")
+                                        .withWriteDefault(Literal.of(1))
+                                        .build(),
+                                Types.NestedField.builder()
+                                        .withId(4)
+                                        .asOptional()
+                                        .withName("description")
+                                        .ofType(Types.StringType.get())
+                                        .withDoc("column for descriptions")
+                                        .withWriteDefault(Literal.of("not 
important"))
+                                        .build(),
+                                Types.NestedField.builder()
+                                        .withId(5)
+                                        .asOptional()
+                                        .withName("bool_column")
+                                        .ofType(Types.BooleanType.get())
+                                        .withDoc("column for bool")
+                                        .withWriteDefault(Literal.of(false))
+                                        .build(),
+                                Types.NestedField.builder()
+                                        .withId(6)
+                                        .asOptional()
+                                        .withName("float_column")
+                                        .ofType(Types.FloatType.get())
+                                        .withDoc("column for float")
+                                        .withWriteDefault(Literal.of(1.0f))
+                                        .build(),
+                                Types.NestedField.builder()
+                                        .withId(7)
+                                        .asOptional()
+                                        .withName("double_column")
+                                        .ofType(Types.DoubleType.get())
+                                        .withDoc("column for double")
+                                        .withWriteDefault(Literal.of(1.0d))
+                                        .build(),
+                                Types.NestedField.builder()
+                                        .withId(8)
+                                        .asOptional()
+                                        .withName("decimal_column")
+                                        .ofType(Types.DecimalType.of(10, 2))
+                                        .withDoc("column for decimal")
+                                        .withWriteDefault(Literal.of(new 
BigDecimal("1.00")))
+                                        .build(),
+                                Types.NestedField.builder()
+                                        .withId(9)
+                                        .asOptional()
+                                        .withName("newIntColumn")
+                                        .ofType(Types.IntegerType.get())
+                                        .withDoc("comment for newIntColumn")
+                                        .withInitialDefault(Literal.of(42))
+                                        .withWriteDefault(Literal.of(42))
+                                        .build()),
+                        new HashSet<>(Collections.singletonList(1)));
+        assertThat(table.schema().sameSchema(schemaAfterAddColumn)).isTrue();
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtilsTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtilsTest.java
new file mode 100644
index 000000000..faafcc5a5
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtilsTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.iceberg.sink.utils;
+
+import org.apache.flink.cdc.common.types.DataTypes;
+
+import org.apache.iceberg.expressions.Literal;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link IcebergTypeUtils}. */
+public class IcebergTypeUtilsTest {
+
+    @Test
+    public void testParseDefaultValueNull() {
+        assertThat(IcebergTypeUtils.parseDefaultValue(null, 
DataTypes.STRING())).isNull();
+    }
+
+    @Test
+    public void testParseDefaultValueString() {
+        Literal<?> result = IcebergTypeUtils.parseDefaultValue("hello", 
DataTypes.STRING());
+        assertThat(result).isNotNull();
+        assertThat(result.value().toString()).isEqualTo("hello");
+    }
+
+    @Test
+    public void testParseDefaultValueVarchar() {
+        Literal<?> result =
+                IcebergTypeUtils.parseDefaultValue("John Smith", 
DataTypes.VARCHAR(255));
+        assertThat(result).isNotNull();
+        assertThat(result.value().toString()).isEqualTo("John Smith");
+    }
+
+    @Test
+    public void testParseDefaultValueBoolean() {
+        Literal<?> result = IcebergTypeUtils.parseDefaultValue("true", 
DataTypes.BOOLEAN());
+        assertThat(result).isNotNull();
+        assertThat(result.value()).isEqualTo(true);
+
+        result = IcebergTypeUtils.parseDefaultValue("false", 
DataTypes.BOOLEAN());
+        assertThat(result).isNotNull();
+        assertThat(result.value()).isEqualTo(false);
+    }
+
+    @Test
+    public void testParseDefaultValueBooleanInvalid() {
+        // Non-canonical boolean values should return null
+        assertThat(IcebergTypeUtils.parseDefaultValue("1", 
DataTypes.BOOLEAN())).isNull();
+        assertThat(IcebergTypeUtils.parseDefaultValue("yes", 
DataTypes.BOOLEAN())).isNull();
+        assertThat(IcebergTypeUtils.parseDefaultValue("garbage", 
DataTypes.BOOLEAN())).isNull();
+    }
+
+    @Test
+    public void testParseDefaultValueInteger() {
+        Literal<?> result = IcebergTypeUtils.parseDefaultValue("42", 
DataTypes.INT());
+        assertThat(result).isNotNull();
+        assertThat(result.value()).isEqualTo(42);
+    }
+
+    @Test
+    public void testParseDefaultValueTinyInt() {
+        Literal<?> result = IcebergTypeUtils.parseDefaultValue("1", 
DataTypes.TINYINT());
+        assertThat(result).isNotNull();
+        assertThat(result.value()).isEqualTo(1);
+    }
+
+    @Test
+    public void testParseDefaultValueSmallInt() {
+        Literal<?> result = IcebergTypeUtils.parseDefaultValue("100", 
DataTypes.SMALLINT());
+        assertThat(result).isNotNull();
+        assertThat(result.value()).isEqualTo(100);
+    }
+
+    @Test
+    public void testParseDefaultValueBigInt() {
+        Literal<?> result = IcebergTypeUtils.parseDefaultValue("9999999999", 
DataTypes.BIGINT());
+        assertThat(result).isNotNull();
+        assertThat(result.value()).isEqualTo(9999999999L);
+    }
+
+    @Test
+    public void testParseDefaultValueFloat() {
+        Literal<?> result = IcebergTypeUtils.parseDefaultValue("1.5", 
DataTypes.FLOAT());
+        assertThat(result).isNotNull();
+        assertThat(result.value()).isEqualTo(1.5f);
+    }
+
+    @Test
+    public void testParseDefaultValueDouble() {
+        Literal<?> result = IcebergTypeUtils.parseDefaultValue("3.14", 
DataTypes.DOUBLE());
+        assertThat(result).isNotNull();
+        assertThat(result.value()).isEqualTo(3.14);
+    }
+
+    @Test
+    public void testParseDefaultValueDecimal() {
+        Literal<?> result = IcebergTypeUtils.parseDefaultValue("1.23", 
DataTypes.DECIMAL(10, 2));
+        assertThat(result).isNotNull();
+        assertThat(result.value()).isEqualTo(new BigDecimal("1.23"));
+    }
+
+    @Test
+    public void testParseDefaultValueUnsupportedType() {
+        Literal<?> result = IcebergTypeUtils.parseDefaultValue("data", 
DataTypes.BYTES());
+        assertThat(result).isNull();
+    }
+
+    @Test
+    public void testParseDefaultValueInvalidNumber() {
+        Literal<?> result = IcebergTypeUtils.parseDefaultValue("not_a_number", 
DataTypes.INT());
+        assertThat(result).isNull();
+    }
+
+    @Test
+    public void testParseDefaultValueFunctionExpression() {
+        // Function expressions like AUTO_DECREMENT() cannot be parsed as 
numbers
+        Literal<?> result =
+                IcebergTypeUtils.parseDefaultValue("AUTO_DECREMENT()", 
DataTypes.BIGINT());
+        assertThat(result).isNull();
+    }
+}


Reply via email to