Copilot commented on code in PR #4277:
URL: https://github.com/apache/flink-cdc/pull/4277#discussion_r2845714553


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java:
##########
@@ -53,6 +60,66 @@ public static Types.NestedField 
convertCdcColumnToIcebergField(
                 column.getComment());
     }
 
+    /**
+     * Parse a CDC default value expression string into an Iceberg {@link 
Literal}.
+     *
+     * @return the parsed Literal, or null if the expression is null or cannot 
be parsed for the
+     *     given type.
+     */
+    @Nullable
+    public static Literal<?> parseDefaultValue(
+            @Nullable String defaultValueExpression, DataType cdcType) {
+        if (defaultValueExpression == null) {
+            return null;
+        }
+        try {
+            switch (cdcType.getTypeRoot()) {

Review Comment:
   `defaultValueExpression` is parsed without trimming. If the upstream default 
contains leading/trailing whitespace (common in some JDBC metadata), numeric 
and boolean parsing will fail and silently skip defaults. Consider applying 
`trim()` (and possibly handling surrounding parentheses) before the switch to 
make parsing more robust.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java:
##########
@@ -53,6 +60,66 @@ public static Types.NestedField 
convertCdcColumnToIcebergField(
                 column.getComment());
     }
 
+    /**
+     * Parse a CDC default value expression string into an Iceberg {@link 
Literal}.
+     *
+     * @return the parsed Literal, or null if the expression is null or cannot 
be parsed for the
+     *     given type.
+     */
+    @Nullable
+    public static Literal<?> parseDefaultValue(
+            @Nullable String defaultValueExpression, DataType cdcType) {
+        if (defaultValueExpression == null) {
+            return null;
+        }
+        try {
+            switch (cdcType.getTypeRoot()) {
+                case CHAR:
+                case VARCHAR:
+                    return Literal.of(defaultValueExpression);
+                case BOOLEAN:

Review Comment:
   For CHAR/VARCHAR defaults, this returns the raw `defaultValueExpression` 
as-is. Some sources (e.g., Oracle/Postgres) may provide quoted literals (e.g., 
`'active'`) or expressions/casts (e.g., `'active'::varchar`), which would end 
up persisted verbatim as an Iceberg default. Consider normalizing string 
literals (strip surrounding quotes/escape) and rejecting non-literal 
expressions (casts/functions) to avoid writing incorrect defaults.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java:
##########
@@ -212,24 +237,28 @@ private void applyAddColumnEventWithPosition(Table table, 
AddColumnEvent event)
                         FlinkSchemaUtil.convert(
                                 
DataTypeUtils.toFlinkDataType(addColumn.getType())
                                         .getLogicalType());
+                Literal<?> defaultValue =
+                        IcebergTypeUtils.parseDefaultValue(
+                                addColumn.getDefaultValueExpression(), 
addColumn.getType());
+                if (defaultValue != null) {
+                    updateSchema.addColumn(columnName, icebergType, 
columnComment, defaultValue);
+                } else {
+                    updateSchema.addColumn(columnName, icebergType, 
columnComment);
+                }
                 switch (columnWithPosition.getPosition()) {
                     case FIRST:
-                        updateSchema.addColumn(columnName, icebergType, 
columnComment);
-                        table.updateSchema().moveFirst(columnName);
+                        updateSchema.moveFirst(columnName);
                         break;
                     case LAST:
-                        updateSchema.addColumn(columnName, icebergType, 
columnComment);
                         break;
                     case BEFORE:

Review Comment:
   In the BEFORE case, `columnWithPosition.getExistedColumnName()` is used 
without a null check, while AFTER explicitly validates it. Since 
`ColumnWithPosition` allows a null `existedColumnName`, consider adding a 
`checkNotNull` for BEFORE as well (matching AFTER) to fail fast with a clearer 
error if an invalid event is provided.
   ```suggestion
                       case BEFORE:
                           checkNotNull(
                                   columnWithPosition.getExistedColumnName(),
                                   "Existing column name must be provided for 
BEFORE position");
   ```



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java:
##########
@@ -53,6 +60,66 @@ public static Types.NestedField 
convertCdcColumnToIcebergField(
                 column.getComment());
     }
 
+    /**
+     * Parse a CDC default value expression string into an Iceberg {@link 
Literal}.
+     *
+     * @return the parsed Literal, or null if the expression is null or cannot 
be parsed for the
+     *     given type.
+     */
+    @Nullable
+    public static Literal<?> parseDefaultValue(
+            @Nullable String defaultValueExpression, DataType cdcType) {
+        if (defaultValueExpression == null) {
+            return null;
+        }
+        try {
+            switch (cdcType.getTypeRoot()) {
+                case CHAR:
+                case VARCHAR:
+                    return Literal.of(defaultValueExpression);
+                case BOOLEAN:
+                    if ("true".equalsIgnoreCase(defaultValueExpression)) {
+                        return Literal.of(true);
+                    } else if 
("false".equalsIgnoreCase(defaultValueExpression)) {
+                        return Literal.of(false);
+                    } else {
+                        LOG.warn(
+                                "Invalid boolean default value '{}', skipping 
default value.",
+                                defaultValueExpression);
+                        return null;
+                    }
+                case TINYINT:
+                case SMALLINT:
+                case INTEGER:
+                    return 
Literal.of(Integer.parseInt(defaultValueExpression));
+                case BIGINT:
+                    return Literal.of(Long.parseLong(defaultValueExpression));
+                case FLOAT:
+                    return 
Literal.of(Float.parseFloat(defaultValueExpression));
+                case DOUBLE:
+                    return 
Literal.of(Double.parseDouble(defaultValueExpression));
+                case DECIMAL:
+                    int scale = DataTypes.getScale(cdcType).orElse(0);
+                    return Literal.of(
+                            new java.math.BigDecimal(defaultValueExpression)
+                                    .setScale(scale, 
java.math.RoundingMode.HALF_UP));
+                default:
+                    LOG.warn(
+                            "Unsupported default value type {} for expression 
'{}', skipping default value.",
+                            cdcType.getTypeRoot(),
+                            defaultValueExpression);
+                    return null;

Review Comment:
   The `default` branch logs a WARN for every unsupported type/expression. 
Given the PR explicitly doesn't support DATE/TIME/TIMESTAMP defaults, this will 
likely emit frequent warnings for common defaults like `CURRENT_TIMESTAMP` 
during table creation/evolution. Consider lowering this to DEBUG or only 
logging when the expression looks like a constant literal but the type is 
unsupported, to reduce log noise in production.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to