This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 1388cf990 [FLINK-34877][cdc] Support type cast conversion in pipeline 
transform
1388cf990 is described below

commit 1388cf99061b36f398aaf593a634a12e7b784c7f
Author: Wink <[email protected]>
AuthorDate: Wed Jul 31 23:32:39 2024 +0800

    [FLINK-34877][cdc] Support type cast conversion in pipeline transform
    
    This closes #3357.
---
 .../cdc/runtime/functions/SystemFunctionUtils.java |  83 +++
 .../flink/cdc/runtime/parser/JaninoCompiler.java   |  66 ++
 .../data/writer/AbstractBinaryWriter.java          |   2 +-
 .../transform/TransformDataOperatorTest.java       | 663 +++++++++++++++++++++
 .../cdc/runtime/parser/TransformParserTest.java    |  28 +
 5 files changed, 841 insertions(+), 1 deletion(-)

diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
index f3dae8c1a..34c299567 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
@@ -26,6 +26,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.math.BigDecimal;
+import java.math.MathContext;
 import java.math.RoundingMode;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
@@ -510,4 +511,86 @@ public class SystemFunctionUtils {
         }
         return null;
     }
+
+    public static String castToString(Object object) {
+        if (object == null) {
+            return null;
+        }
+        return object.toString();
+    }
+
+    public static Byte castToByte(Object object) {
+        if (object == null) {
+            return null;
+        }
+        return Byte.valueOf(castObjectIntoString(object));
+    }
+
+    public static Boolean castToBoolean(Object object) {
+        if (object == null) {
+            return null;
+        }
+        if (object instanceof Byte
+                || object instanceof Short
+                || object instanceof Integer
+                || object instanceof Long
+                || object instanceof Float
+                || object instanceof Double
+                || object instanceof BigDecimal) {
+            return !object.equals(0);
+        }
+        return Boolean.valueOf(castToString(object));
+    }
+
+    public static Short castToShort(Object object) {
+        if (object == null) {
+            return null;
+        }
+        return Short.valueOf(castObjectIntoString(object));
+    }
+
+    public static Integer castToInteger(Object object) {
+        if (object == null) {
+            return null;
+        }
+        return Integer.valueOf(castObjectIntoString(object));
+    }
+
+    public static Long castToLong(Object object) {
+        if (object == null) {
+            return null;
+        }
+        return Long.valueOf(castObjectIntoString(object));
+    }
+
+    public static Float castToFloat(Object object) {
+        if (object == null) {
+            return null;
+        }
+        return Float.valueOf(castObjectIntoString(object));
+    }
+
+    public static Double castToDouble(Object object) {
+        if (object == null) {
+            return null;
+        }
+        return Double.valueOf(castObjectIntoString(object));
+    }
+
+    public static BigDecimal castToBigDecimal(Object object, int precision, 
int scale) {
+        if (object == null) {
+            return null;
+        }
+        BigDecimal bigDecimal =
+                new BigDecimal(castObjectIntoString(object), new 
MathContext(precision));
+        bigDecimal = bigDecimal.setScale(scale, BigDecimal.ROUND_HALF_UP);
+        return bigDecimal;
+    }
+
+    private static String castObjectIntoString(Object object) {
+        if (object instanceof Boolean) {
+            return Boolean.valueOf(castToString(object)) ? "1" : "0";
+        }
+        return String.valueOf(object);
+    }
 }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
index 5af9755ed..2dd1b8402 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
@@ -22,7 +22,9 @@ import org.apache.flink.api.common.io.ParseException;
 import org.apache.flink.cdc.common.utils.StringUtils;
 
 import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlBasicTypeNameSpec;
 import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlDataTypeSpec;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.SqlNode;
@@ -228,6 +230,8 @@ public class JaninoCompiler {
             case LESS_THAN_OR_EQUAL:
             case GREATER_THAN_OR_EQUAL:
                 return generateBinaryOperation(sqlBasicCall, atoms, 
sqlBasicCall.getKind().sql);
+            case CAST:
+                return generateCastOperation(sqlBasicCall, atoms);
             case OTHER:
                 return generateOtherOperation(sqlBasicCall, atoms);
             default:
@@ -256,6 +260,16 @@ public class JaninoCompiler {
                 Location.NOWHERE, null, 
StringUtils.convertToCamelCase("VALUE_EQUALS"), atoms);
     }
 
+    private static Java.Rvalue generateCastOperation(
+            SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
+        if (atoms.length != 1) {
+            throw new ParseException("Unrecognized expression: " + 
sqlBasicCall.toString());
+        }
+        List<SqlNode> operandList = sqlBasicCall.getOperandList();
+        SqlDataTypeSpec sqlDataTypeSpec = (SqlDataTypeSpec) operandList.get(1);
+        return generateTypeConvertMethod(sqlDataTypeSpec, atoms);
+    }
+
     private static Java.Rvalue generateOtherOperation(
             SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
         if (sqlBasicCall.getOperator().getName().equals("||")) {
@@ -298,4 +312,56 @@ public class JaninoCompiler {
                 StringUtils.convertToCamelCase(operationName),
                 timestampFunctionParam.toArray(new Java.Rvalue[0]));
     }
+
+    private static Java.Rvalue generateTypeConvertMethod(
+            SqlDataTypeSpec sqlDataTypeSpec, Java.Rvalue[] atoms) {
+        switch (sqlDataTypeSpec.getTypeName().getSimple().toUpperCase()) {
+            case "BOOLEAN":
+                return new Java.MethodInvocation(Location.NOWHERE, null, 
"castToBoolean", atoms);
+            case "TINYINT":
+                return new Java.MethodInvocation(Location.NOWHERE, null, 
"castToByte", atoms);
+            case "SMALLINT":
+                return new Java.MethodInvocation(Location.NOWHERE, null, 
"castToShort", atoms);
+            case "INTEGER":
+                return new Java.MethodInvocation(Location.NOWHERE, null, 
"castToInteger", atoms);
+            case "BIGINT":
+                return new Java.MethodInvocation(Location.NOWHERE, null, 
"castToLong", atoms);
+            case "FLOAT":
+                return new Java.MethodInvocation(Location.NOWHERE, null, 
"castToFloat", atoms);
+            case "DOUBLE":
+                return new Java.MethodInvocation(Location.NOWHERE, null, 
"castToDouble", atoms);
+            case "DECIMAL":
+                int precision = 10;
+                int scale = 0;
+                if (sqlDataTypeSpec.getTypeNameSpec() instanceof 
SqlBasicTypeNameSpec) {
+                    SqlBasicTypeNameSpec typeNameSpec =
+                            (SqlBasicTypeNameSpec) 
sqlDataTypeSpec.getTypeNameSpec();
+                    if (typeNameSpec.getPrecision() > -1) {
+                        precision = typeNameSpec.getPrecision();
+                    }
+                    if (typeNameSpec.getScale() > -1) {
+                        scale = typeNameSpec.getScale();
+                    }
+                }
+                List<Java.Rvalue> newAtoms = new 
ArrayList<>(Arrays.asList(atoms));
+                newAtoms.add(
+                        new Java.AmbiguousName(
+                                Location.NOWHERE, new String[] 
{String.valueOf(precision)}));
+                newAtoms.add(
+                        new Java.AmbiguousName(
+                                Location.NOWHERE, new String[] 
{String.valueOf(scale)}));
+                return new Java.MethodInvocation(
+                        Location.NOWHERE,
+                        null,
+                        "castToBigDecimal",
+                        newAtoms.toArray(new Java.Rvalue[0]));
+            case "CHAR":
+            case "VARCHAR":
+            case "STRING":
+                return new Java.MethodInvocation(Location.NOWHERE, null, 
"castToString", atoms);
+            default:
+                throw new ParseException(
+                        "Unsupported data type cast: " + 
sqlDataTypeSpec.toString());
+        }
+    }
 }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java
index acf3dac62..1422935ea 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java
@@ -138,7 +138,7 @@ abstract class AbstractBinaryWriter implements BinaryWriter 
{
 
     @Override
     public void writeDecimal(int pos, DecimalData value, int precision) {
-        assert value == null || (value.precision() == precision);
+        assert value == null || (value.precision() <= precision);
 
         if (DecimalData.isCompact(precision)) {
             assert value != null;
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java
index 09d12e823..2213127a4 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java
@@ -115,6 +115,43 @@ public class TransformDataOperatorTest {
                     .primaryKey("col1")
                     .build();
 
+    private static final TableId NULL_TABLEID =
+            TableId.tableId("my_company", "my_branch", "data_null");
+    private static final Schema NULL_SCHEMA =
+            Schema.newBuilder()
+                    .physicalColumn("col1", DataTypes.STRING())
+                    .physicalColumn("colString", DataTypes.STRING())
+                    .physicalColumn("nullInt", DataTypes.INT())
+                    .physicalColumn("nullBoolean", DataTypes.BOOLEAN())
+                    .physicalColumn("nullTinyint", DataTypes.TINYINT())
+                    .physicalColumn("nullSmallint", DataTypes.SMALLINT())
+                    .physicalColumn("nullBigint", DataTypes.BIGINT())
+                    .physicalColumn("nullFloat", DataTypes.FLOAT())
+                    .physicalColumn("nullDouble", DataTypes.DOUBLE())
+                    .physicalColumn("nullChar", DataTypes.CHAR(1))
+                    .physicalColumn("nullVarchar", DataTypes.VARCHAR(1))
+                    .physicalColumn("nullDecimal", DataTypes.DECIMAL(4, 2))
+                    .primaryKey("col1")
+                    .build();
+
+    private static final TableId CAST_TABLEID =
+            TableId.tableId("my_company", "my_branch", "data_cast");
+    private static final Schema CAST_SCHEMA =
+            Schema.newBuilder()
+                    .physicalColumn("col1", DataTypes.STRING())
+                    .physicalColumn("castInt", DataTypes.INT())
+                    .physicalColumn("castBoolean", DataTypes.BOOLEAN())
+                    .physicalColumn("castTinyint", DataTypes.TINYINT())
+                    .physicalColumn("castSmallint", DataTypes.SMALLINT())
+                    .physicalColumn("castBigint", DataTypes.BIGINT())
+                    .physicalColumn("castFloat", DataTypes.FLOAT())
+                    .physicalColumn("castDouble", DataTypes.DOUBLE())
+                    .physicalColumn("castChar", DataTypes.CHAR(1))
+                    .physicalColumn("castVarchar", DataTypes.VARCHAR(1))
+                    .physicalColumn("castDecimal", DataTypes.DECIMAL(4, 2))
+                    .primaryKey("col1")
+                    .build();
+
     private static final TableId CONDITION_TABLEID =
             TableId.tableId("my_company", "my_branch", "condition_table");
     private static final Schema CONDITION_SCHEMA =
@@ -555,6 +592,612 @@ public class TransformDataOperatorTest {
                 .isEqualTo(new StreamRecord<>(insertEventExpect));
     }
 
+    @Test
+    void testNullCastTransform() throws Exception {
+        TransformDataOperator transform =
+                TransformDataOperator.newBuilder()
+                        .addTransform(
+                                NULL_TABLEID.identifier(),
+                                "col1"
+                                        + ",colString"
+                                        + ",cast(colString as int) as nullInt"
+                                        + ",cast(colString as boolean) as 
nullBoolean"
+                                        + ",cast(colString as tinyint) as 
nullTinyint"
+                                        + ",cast(colString as smallint) as 
nullSmallint"
+                                        + ",cast(colString as bigint) as 
nullBigint"
+                                        + ",cast(colString as float) as 
nullFloat"
+                                        + ",cast(colString as double) as 
nullDouble"
+                                        + ",cast(colString as char) as 
nullChar"
+                                        + ",cast(colString as varchar) as 
nullVarchar"
+                                        + ",cast(colString as DECIMAL(4,2)) as 
nullDecimal",
+                                null)
+                        .build();
+        EventOperatorTestHarness<TransformDataOperator, Event>
+                transformFunctionEventEventOperatorTestHarness =
+                        new EventOperatorTestHarness<>(transform, 1);
+        // Initialization
+        transformFunctionEventEventOperatorTestHarness.open();
+        // Create table
+        CreateTableEvent createTableEvent = new CreateTableEvent(NULL_TABLEID, 
NULL_SCHEMA);
+        BinaryRecordDataGenerator recordDataGenerator =
+                new BinaryRecordDataGenerator(((RowType) 
NULL_SCHEMA.toRowDataType()));
+        // Insert
+        DataChangeEvent insertEvent =
+                DataChangeEvent.insertEvent(
+                        NULL_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("1"),
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                }));
+        transform.processElement(new StreamRecord<>(createTableEvent));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(new 
CreateTableEvent(NULL_TABLEID, NULL_SCHEMA)));
+        transform.processElement(new StreamRecord<>(insertEvent));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(insertEvent));
+    }
+
+    @Test
+    void testCastTransform() throws Exception {
+        TransformDataOperator transform =
+                TransformDataOperator.newBuilder()
+                        .addTransform(
+                                CAST_TABLEID.identifier(),
+                                "col1"
+                                        + ",cast(col1 as int) as castInt"
+                                        + ",cast(col1 as boolean) as 
castBoolean"
+                                        + ",cast(col1 as tinyint) as 
castTinyint"
+                                        + ",cast(col1 as smallint) as 
castSmallint"
+                                        + ",cast(col1 as bigint) as castBigint"
+                                        + ",cast(col1 as float) as castFloat"
+                                        + ",cast(col1 as double) as castDouble"
+                                        + ",cast(col1 as char) as castChar"
+                                        + ",cast(col1 as varchar) as 
castVarchar"
+                                        + ",cast(col1 as DECIMAL(4,2)) as 
castDecimal",
+                                "col1 = '1'")
+                        .addTransform(
+                                CAST_TABLEID.identifier(),
+                                "col1"
+                                        + ",cast(castInt as int) as castInt"
+                                        + ",cast(castInt as boolean) as 
castBoolean"
+                                        + ",cast(castInt as tinyint) as 
castTinyint"
+                                        + ",cast(castInt as smallint) as 
castSmallint"
+                                        + ",cast(castInt as bigint) as 
castBigint"
+                                        + ",cast(castInt as float) as 
castFloat"
+                                        + ",cast(castInt as double) as 
castDouble"
+                                        + ",cast(castInt as char) as castChar"
+                                        + ",cast(castInt as varchar) as 
castVarchar"
+                                        + ",cast(castInt as DECIMAL(4,2)) as 
castDecimal",
+                                "col1 = '2'")
+                        .addTransform(
+                                CAST_TABLEID.identifier(),
+                                "col1"
+                                        + ",cast(castBoolean as int) as 
castInt"
+                                        + ",cast(castBoolean as boolean) as 
castBoolean"
+                                        + ",cast(castBoolean as tinyint) as 
castTinyint"
+                                        + ",cast(castBoolean as smallint) as 
castSmallint"
+                                        + ",cast(castBoolean as bigint) as 
castBigint"
+                                        + ",cast(castBoolean as float) as 
castFloat"
+                                        + ",cast(castBoolean as double) as 
castDouble"
+                                        + ",cast(castBoolean as char) as 
castChar"
+                                        + ",cast(castBoolean as varchar) as 
castVarchar"
+                                        + ",cast(castBoolean as DECIMAL(4,2)) 
as castDecimal",
+                                "col1 = '3'")
+                        .addTransform(
+                                CAST_TABLEID.identifier(),
+                                "col1"
+                                        + ",cast(castTinyint as int) as 
castInt"
+                                        + ",cast(castTinyint as boolean) as 
castBoolean"
+                                        + ",cast(castTinyint as tinyint) as 
castTinyint"
+                                        + ",cast(castTinyint as smallint) as 
castSmallint"
+                                        + ",cast(castTinyint as bigint) as 
castBigint"
+                                        + ",cast(castTinyint as float) as 
castFloat"
+                                        + ",cast(castTinyint as double) as 
castDouble"
+                                        + ",cast(castTinyint as char) as 
castChar"
+                                        + ",cast(castTinyint as varchar) as 
castVarchar"
+                                        + ",cast(castTinyint as DECIMAL(4,2)) 
as castDecimal",
+                                "col1 = '4'")
+                        .addTransform(
+                                CAST_TABLEID.identifier(),
+                                "col1"
+                                        + ",cast(castSmallint as int) as 
castInt"
+                                        + ",cast(castSmallint as boolean) as 
castBoolean"
+                                        + ",cast(castSmallint as tinyint) as 
castTinyint"
+                                        + ",cast(castSmallint as smallint) as 
castSmallint"
+                                        + ",cast(castSmallint as bigint) as 
castBigint"
+                                        + ",cast(castSmallint as float) as 
castFloat"
+                                        + ",cast(castSmallint as double) as 
castDouble"
+                                        + ",cast(castSmallint as char) as 
castChar"
+                                        + ",cast(castSmallint as varchar) as 
castVarchar"
+                                        + ",cast(castSmallint as DECIMAL(4,2)) 
as castDecimal",
+                                "col1 = '5'")
+                        .addTransform(
+                                CAST_TABLEID.identifier(),
+                                "col1"
+                                        + ",cast(castBigint as int) as castInt"
+                                        + ",cast(castBigint as boolean) as 
castBoolean"
+                                        + ",cast(castBigint as tinyint) as 
castTinyint"
+                                        + ",cast(castBigint as smallint) as 
castSmallint"
+                                        + ",cast(castBigint as bigint) as 
castBigint"
+                                        + ",cast(castBigint as float) as 
castFloat"
+                                        + ",cast(castBigint as double) as 
castDouble"
+                                        + ",cast(castBigint as char) as 
castChar"
+                                        + ",cast(castBigint as varchar) as 
castVarchar"
+                                        + ",cast(castBigint as DECIMAL(4,2)) 
as castDecimal",
+                                "col1 = '6'")
+                        .addTransform(
+                                CAST_TABLEID.identifier(),
+                                "col1"
+                                        + ",castInt"
+                                        + ",cast(castFloat as boolean) as 
castBoolean"
+                                        + ",castTinyint"
+                                        + ",castSmallint"
+                                        + ",castBigint"
+                                        + ",cast(castFloat as float) as 
castFloat"
+                                        + ",cast(castFloat as double) as 
castDouble"
+                                        + ",cast(castFloat as char) as 
castChar"
+                                        + ",cast(castFloat as varchar) as 
castVarchar"
+                                        + ",cast(castFloat as DECIMAL(4,2)) as 
castDecimal",
+                                "col1 = '7'")
+                        .addTransform(
+                                CAST_TABLEID.identifier(),
+                                "col1"
+                                        + ",castInt"
+                                        + ",cast(castDouble as boolean) as 
castBoolean"
+                                        + ",castTinyint"
+                                        + ",castSmallint"
+                                        + ",castBigint"
+                                        + ",cast(castDouble as float) as 
castFloat"
+                                        + ",cast(castDouble as double) as 
castDouble"
+                                        + ",cast(castDouble as char) as 
castChar"
+                                        + ",cast(castDouble as varchar) as 
castVarchar"
+                                        + ",cast(castDouble as DECIMAL(4,2)) 
as castDecimal",
+                                "col1 = '8'")
+                        .addTransform(
+                                CAST_TABLEID.identifier(),
+                                "col1"
+                                        + ",castInt"
+                                        + ",cast(castDecimal as boolean) as 
castBoolean"
+                                        + ",castTinyint"
+                                        + ",castSmallint"
+                                        + ",castBigint"
+                                        + ",cast(castDecimal as float) as 
castFloat"
+                                        + ",cast(castDecimal as double) as 
castDouble"
+                                        + ",cast(castDecimal as char) as 
castChar"
+                                        + ",cast(castDecimal as varchar) as 
castVarchar"
+                                        + ",cast(castDecimal as DECIMAL(4,2)) 
as castDecimal",
+                                "col1 = '9'")
+                        .build();
+        EventOperatorTestHarness<TransformDataOperator, Event>
+                transformFunctionEventEventOperatorTestHarness =
+                        new EventOperatorTestHarness<>(transform, 1);
+        // Initialization
+        transformFunctionEventEventOperatorTestHarness.open();
+        // Create table
+        CreateTableEvent createTableEvent = new CreateTableEvent(CAST_TABLEID, 
CAST_SCHEMA);
+        BinaryRecordDataGenerator recordDataGenerator =
+                new BinaryRecordDataGenerator(((RowType) 
CAST_SCHEMA.toRowDataType()));
+        // Insert
+        DataChangeEvent insertEvent1 =
+                DataChangeEvent.insertEvent(
+                        CAST_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("1"),
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                }));
+        DataChangeEvent insertEventExpect1 =
+                DataChangeEvent.insertEvent(
+                        CAST_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("1"),
+                                    new Integer(1),
+                                    new Boolean(false),
+                                    new Byte("1"),
+                                    new Short("1"),
+                                    new Long(1),
+                                    new Float(1.0f),
+                                    new Double(1.0d),
+                                    new BinaryStringData("1"),
+                                    new BinaryStringData("1"),
+                                    DecimalData.fromBigDecimal(new 
BigDecimal(1.0), 4, 2),
+                                }));
+        transform.processElement(new StreamRecord<>(createTableEvent));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(new 
CreateTableEvent(CAST_TABLEID, CAST_SCHEMA)));
+        transform.processElement(new StreamRecord<>(insertEvent1));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(insertEventExpect1));
+        DataChangeEvent insertEvent2 =
+                DataChangeEvent.insertEvent(
+                        CAST_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("2"),
+                                    new Integer(1),
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                }));
+        DataChangeEvent insertEventExpect2 =
+                DataChangeEvent.insertEvent(
+                        CAST_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("2"),
+                                    new Integer(1),
+                                    new Boolean(true),
+                                    new Byte("1"),
+                                    new Short("1"),
+                                    new Long(1),
+                                    new Float(1.0f),
+                                    new Double(1.0d),
+                                    new BinaryStringData("1"),
+                                    new BinaryStringData("1"),
+                                    DecimalData.fromBigDecimal(new 
BigDecimal(1.0), 4, 2),
+                                }));
+        transform.processElement(new StreamRecord<>(insertEvent2));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(insertEventExpect2));
+        DataChangeEvent insertEvent3 =
+                DataChangeEvent.insertEvent(
+                        CAST_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("3"),
+                                    null,
+                                    new Boolean(true),
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                }));
+        DataChangeEvent insertEventExpect3 =
+                DataChangeEvent.insertEvent(
+                        CAST_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("3"),
+                                    new Integer(1),
+                                    new Boolean(true),
+                                    new Byte("1"),
+                                    new Short("1"),
+                                    new Long(1),
+                                    new Float(1.0f),
+                                    new Double(1.0d),
+                                    new BinaryStringData("true"),
+                                    new BinaryStringData("true"),
+                                    DecimalData.fromBigDecimal(new 
BigDecimal(1.0), 4, 2),
+                                }));
+        transform.processElement(new StreamRecord<>(insertEvent3));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(insertEventExpect3));
+        DataChangeEvent insertEvent4 =
+                DataChangeEvent.insertEvent(
+                        CAST_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("4"),
+                                    null,
+                                    null,
+                                    new Byte("1"),
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                }));
+        DataChangeEvent insertEventExpect4 =
+                DataChangeEvent.insertEvent(
+                        CAST_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("4"),
+                                    new Integer(1),
+                                    new Boolean(true),
+                                    new Byte("1"),
+                                    new Short("1"),
+                                    new Long(1),
+                                    new Float(1.0f),
+                                    new Double(1.0d),
+                                    new BinaryStringData("1"),
+                                    new BinaryStringData("1"),
+                                    DecimalData.fromBigDecimal(new 
BigDecimal(1.0), 4, 2),
+                                }));
+        transform.processElement(new StreamRecord<>(insertEvent4));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(insertEventExpect4));
+        DataChangeEvent insertEvent5 =
+                DataChangeEvent.insertEvent(
+                        CAST_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("5"),
+                                    null,
+                                    null,
+                                    null,
+                                    new Short("1"),
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                }));
+        DataChangeEvent insertEventExpect5 =
+                DataChangeEvent.insertEvent(
+                        CAST_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("5"),
+                                    new Integer(1),
+                                    new Boolean(true),
+                                    new Byte("1"),
+                                    new Short("1"),
+                                    new Long(1),
+                                    new Float(1.0f),
+                                    new Double(1.0d),
+                                    new BinaryStringData("1"),
+                                    new BinaryStringData("1"),
+                                    DecimalData.fromBigDecimal(new 
BigDecimal(1.0), 4, 2),
+                                }));
+        transform.processElement(new StreamRecord<>(insertEvent5));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(insertEventExpect5));
+        DataChangeEvent insertEvent6 =
+                DataChangeEvent.insertEvent(
+                        CAST_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("6"),
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    new Long(1),
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                }));
+        DataChangeEvent insertEventExpect6 =
+                DataChangeEvent.insertEvent(
+                        CAST_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("6"),
+                                    new Integer(1),
+                                    new Boolean(true),
+                                    new Byte("1"),
+                                    new Short("1"),
+                                    new Long(1),
+                                    new Float(1.0f),
+                                    new Double(1.0d),
+                                    new BinaryStringData("1"),
+                                    new BinaryStringData("1"),
+                                    DecimalData.fromBigDecimal(new 
BigDecimal(1.0), 4, 2),
+                                }));
+        transform.processElement(new StreamRecord<>(insertEvent6));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(insertEventExpect6));
+        DataChangeEvent insertEvent7 =
+                DataChangeEvent.insertEvent(
+                        CAST_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("7"),
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    new Float(1.0f),
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                }));
+        DataChangeEvent insertEventExpect7 =
+                DataChangeEvent.insertEvent(
+                        CAST_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("7"),
+                                    null,
+                                    new Boolean(true),
+                                    null,
+                                    null,
+                                    null,
+                                    new Float(1.0f),
+                                    new Double(1.0d),
+                                    new BinaryStringData("1.0"),
+                                    new BinaryStringData("1.0"),
+                                    DecimalData.fromBigDecimal(new 
BigDecimal(1.0), 4, 2),
+                                }));
+        transform.processElement(new StreamRecord<>(insertEvent7));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(insertEventExpect7));
+        DataChangeEvent insertEvent8 =
+                DataChangeEvent.insertEvent(
+                        CAST_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("8"),
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    new Double(1.0d),
+                                    null,
+                                    null,
+                                    null,
+                                }));
+        DataChangeEvent insertEventExpect8 =
+                DataChangeEvent.insertEvent(
+                        CAST_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("8"),
+                                    null,
+                                    new Boolean(true),
+                                    null,
+                                    null,
+                                    null,
+                                    new Float(1.0f),
+                                    new Double(1.0d),
+                                    new BinaryStringData("1.0"),
+                                    new BinaryStringData("1.0"),
+                                    DecimalData.fromBigDecimal(new 
BigDecimal(1.0), 4, 2),
+                                }));
+        transform.processElement(new StreamRecord<>(insertEvent8));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(insertEventExpect8));
+        DataChangeEvent insertEvent9 =
+                DataChangeEvent.insertEvent(
+                        CAST_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("9"),
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    DecimalData.fromBigDecimal(new 
BigDecimal(1.0), 4, 2),
+                                }));
+        DataChangeEvent insertEventExpect9 =
+                DataChangeEvent.insertEvent(
+                        CAST_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("9"),
+                                    null,
+                                    new Boolean(true),
+                                    null,
+                                    null,
+                                    null,
+                                    new Float(1.0f),
+                                    new Double(1.0d),
+                                    new BinaryStringData("1.00"),
+                                    new BinaryStringData("1.00"),
+                                    DecimalData.fromBigDecimal(new 
BigDecimal(1.0), 4, 2),
+                                }));
+        transform.processElement(new StreamRecord<>(insertEvent9));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(insertEventExpect9));
+    }
+
+    @Test
+    void testCastErrorTransform() throws Exception {
+        TransformDataOperator transform =
+                TransformDataOperator.newBuilder()
+                        .addTransform(
+                                CAST_TABLEID.identifier(),
+                                "col1"
+                                        + ",cast(castFloat as int) as castInt"
+                                        + ",cast(castFloat as boolean) as 
castBoolean"
+                                        + ",cast(castFloat as tinyint) as 
castTinyint"
+                                        + ",cast(castFloat as smallint) as 
castSmallint"
+                                        + ",cast(castFloat as bigint) as 
castBigint"
+                                        + ",cast(castFloat as float) as 
castFloat"
+                                        + ",cast(castFloat as double) as 
castDouble"
+                                        + ",cast(castFloat as char) as 
castChar"
+                                        + ",cast(castFloat as varchar) as 
castVarchar"
+                                        + ",cast(castFloat as DECIMAL(4,2)) as 
castDecimal",
+                                "col1 = '1'")
+                        .build();
+        EventOperatorTestHarness<TransformDataOperator, Event>
+                transformFunctionEventEventOperatorTestHarness =
+                        new EventOperatorTestHarness<>(transform, 1);
+        // Initialization
+        transformFunctionEventEventOperatorTestHarness.open();
+        // Create table
+        CreateTableEvent createTableEvent = new CreateTableEvent(CAST_TABLEID, 
CAST_SCHEMA);
+        BinaryRecordDataGenerator recordDataGenerator =
+                new BinaryRecordDataGenerator(((RowType) 
CAST_SCHEMA.toRowDataType()));
+        // Insert
+        DataChangeEvent insertEvent1 =
+                DataChangeEvent.insertEvent(
+                        CAST_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("1"),
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    new Float(1.0f),
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                }));
+        transform.processElement(new StreamRecord<>(createTableEvent));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(new 
CreateTableEvent(CAST_TABLEID, CAST_SCHEMA)));
+        Assertions.assertThatThrownBy(
+                        () -> {
+                            transform.processElement(new 
StreamRecord<>(insertEvent1));
+                        })
+                .isExactlyInstanceOf(RuntimeException.class)
+                .hasRootCauseInstanceOf(NumberFormatException.class)
+                .hasRootCauseMessage("For input string: \"1.0\"");
+    }
+
     @Test
     void testBuildInFunctionTransform() throws Exception {
         testExpressionConditionTransform(
@@ -592,6 +1235,26 @@ public class TransformDataOperatorTest {
                 "case 1 when 1 then 'a' when 2 then 'b' else 'c' end = 'a'");
         testExpressionConditionTransform("case col1 when '1' then true else 
false end");
         testExpressionConditionTransform("case when col1 = '1' then true else 
false end");
+        testExpressionConditionTransform("cast(col1 as int) = 1");
+        testExpressionConditionTransform("cast('true' as boolean)");
+        testExpressionConditionTransform("cast(col1 as tinyint) = cast(1 as 
tinyint)");
+        testExpressionConditionTransform("cast(col1 as smallint) = cast(1 as 
smallint)");
+        testExpressionConditionTransform("cast(col1 as bigint) = cast(1 as 
bigint)");
+        testExpressionConditionTransform("cast(col1 as float) = cast(1 as 
float)");
+        testExpressionConditionTransform("cast(col1 as double) = cast(1 as 
double)");
+        testExpressionConditionTransform("cast('1' as char) = '1'");
+        testExpressionConditionTransform("cast(col1 as varchar) = '1'");
+        testExpressionConditionTransform("cast(col1 as DECIMAL(4,2)) = 
cast(1.0 as DECIMAL(4,2))");
+        testExpressionConditionTransform("cast(null as int) is null");
+        testExpressionConditionTransform("cast(null as boolean) is null");
+        testExpressionConditionTransform("cast(null as tinyint) is null");
+        testExpressionConditionTransform("cast(null as smallint) is null");
+        testExpressionConditionTransform("cast(null as bigint) is null");
+        testExpressionConditionTransform("cast(null as float) is null");
+        testExpressionConditionTransform("cast(null as double) is null");
+        testExpressionConditionTransform("cast(null as char) is null");
+        testExpressionConditionTransform("cast(null as varchar) is null");
+        testExpressionConditionTransform("cast(null as DECIMAL(4,2)) is null");
     }
 
     private void testExpressionConditionTransform(String expression) throws 
Exception {
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
index 9dffeb84a..f81c4a92c 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
@@ -264,6 +264,34 @@ public class TransformParserTest {
         testFilterExpression(
                 "case when id = 1 then 'a' when id = 2 then 'b' else 'c' end",
                 "(valueEquals(id, 1) ? \"a\" : valueEquals(id, 2) ? \"b\" : 
\"c\")");
+        testFilterExpression(
+                "case id when 1 then 'a' when 2 then 'b' else 'c' end",
+                "(valueEquals(id, 1) ? \"a\" : valueEquals(id, 2) ? \"b\" : 
\"c\")");
+        testFilterExpression(
+                "case when id = 1 then 'a' when id = 2 then 'b' else 'c' end",
+                "(valueEquals(id, 1) ? \"a\" : valueEquals(id, 2) ? \"b\" : 
\"c\")");
+        testFilterExpression("cast(id||'0' as int)", "castToInteger(concat(id, 
\"0\"))");
+        testFilterExpression("cast(1 as string)", "castToString(1)");
+        testFilterExpression("cast(1 as boolean)", "castToBoolean(1)");
+        testFilterExpression("cast(1 as tinyint)", "castToByte(1)");
+        testFilterExpression("cast(1 as smallint)", "castToShort(1)");
+        testFilterExpression("cast(1 as bigint)", "castToLong(1)");
+        testFilterExpression("cast(1 as float)", "castToFloat(1)");
+        testFilterExpression("cast(1 as double)", "castToDouble(1)");
+        testFilterExpression("cast(1 as decimal)", "castToBigDecimal(1, 10, 
0)");
+        testFilterExpression("cast(1 as char)", "castToString(1)");
+        testFilterExpression("cast(1 as varchar)", "castToString(1)");
+        testFilterExpression("cast(null as int)", "castToInteger(null)");
+        testFilterExpression("cast(null as string)", "castToString(null)");
+        testFilterExpression("cast(null as boolean)", "castToBoolean(null)");
+        testFilterExpression("cast(null as tinyint)", "castToByte(null)");
+        testFilterExpression("cast(null as smallint)", "castToShort(null)");
+        testFilterExpression("cast(null as bigint)", "castToLong(null)");
+        testFilterExpression("cast(null as float)", "castToFloat(null)");
+        testFilterExpression("cast(null as double)", "castToDouble(null)");
+        testFilterExpression("cast(null as decimal)", "castToBigDecimal(null, 
10, 0)");
+        testFilterExpression("cast(null as char)", "castToString(null)");
+        testFilterExpression("cast(null as varchar)", "castToString(null)");
     }
 
     private void testFilterExpression(String expression, String 
expressionExpect) {

Reply via email to