This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 79e868b86 [FLINK-36741][transform] Fix the decimal precision and 
length lost during transform
79e868b86 is described below

commit 79e868b86483b66e83edc22f060e57c3a68bae9b
Author: Wink <809097...@qq.com>
AuthorDate: Wed Jan 15 11:01:39 2025 +0800

    [FLINK-36741][transform] Fix the decimal precision and length lost during 
transform
    
    This closes #3740
---
 .../flink/FlinkPipelineTransformITCase.java        |   2 +-
 .../cdc/composer/flink/FlinkPipelineUdfITCase.java | 108 +++++++++-
 .../values/sink/ValuesDataSinkHelper.java          |   5 +-
 .../java/precision/BinaryTypeReturningClass.java   |  34 +++
 .../java/precision/CharTypeReturningClass.java     |  35 +++
 .../DecimalTypeNonNullReturningClass.java          |  37 ++++
 .../java/precision/DecimalTypeReturningClass.java  |  37 ++++
 .../LocalZonedTimestampTypeReturningClass.java     |  35 +++
 .../precision/TimestampTypeReturningClass.java     |  35 +++
 .../precision/VarBinaryTypeReturningClass.java     |  34 +++
 .../java/precision/VarCharTypeReturningClass.java  |  34 +++
 .../scala/precision/BinaryTypeReturningClass.scala |  28 +++
 .../scala/precision/CharTypeReturningClass.scala   |  28 +++
 .../DecimalTypeNonNullReturningClass.scala         |  30 +++
 .../precision/DecimalTypeReturningClass.scala      |  31 +++
 .../LocalZonedTimestampTypeReturningClass.scala    |  29 +++
 .../precision/TimestampTypeReturningClass.scala    |  29 +++
 .../precision/VarBinaryTypeReturningClass.scala    |  28 +++
 .../precision/VarCharTypeReturningClass.scala      |  28 +++
 .../cdc/runtime/functions/SystemFunctionUtils.java |  82 ++++++++
 .../flink/cdc/runtime/parser/JaninoCompiler.java   |   4 +-
 .../flink/cdc/runtime/parser/TransformParser.java  |   4 +-
 .../runtime/parser/metadata/TransformTable.java    |  12 +-
 .../cdc/runtime/typeutils/DataTypeConverter.java   | 234 ++++++++++++++++++---
 .../cdc/runtime/parser/TransformParserTest.java    |  58 ++++-
 25 files changed, 972 insertions(+), 49 deletions(-)

diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
index 561097286..6364b6981 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
@@ -1979,7 +1979,7 @@ class FlinkPipelineTransformITCase {
                                                 + "2147483648 AS 
greater_than_int_max, "
                                                 + "-2147483648 AS int_min, "
                                                 + "-2147483649 AS 
less_than_int_min, "
-                                                + "CAST(1234567890123456789 AS 
DECIMAL(20, 0)) AS really_big_decimal",
+                                                + "CAST(1234567890123456789 AS 
DECIMAL(19, 0)) AS really_big_decimal",
                                         "CAST(id AS BIGINT) + 2147483648 > 
2147483649", // equivalent to id > 1
                                         null,
                                         null,
diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java
index f3161105e..03edd9069 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java
@@ -841,7 +841,7 @@ public class FlinkPipelineUdfITCase {
     @ParameterizedTest
     @MethodSource("testParams")
     @Disabled("For manual test as there is a limit for quota.")
-    void testTransformWithModel(ValuesDataSink.SinkApi sinkApi) throws 
Exception {
+    void testTransformWithModel(ValuesDataSink.SinkApi sinkApi, String 
language) throws Exception {
         FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
 
         // Setup value source
@@ -909,6 +909,112 @@ public class FlinkPipelineUdfITCase {
                 .hasSize(9);
     }
 
+    @ParameterizedTest
+    @MethodSource("testParams")
+    void testComplicatedUdfReturnTypes(ValuesDataSink.SinkApi sinkApi, String 
language)
+            throws Exception {
+        FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+        // Setup value source
+        Configuration sourceConfig = new Configuration();
+        sourceConfig.set(
+                ValuesDataSourceOptions.EVENT_SET_ID,
+                ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_SINGLE_TABLE);
+        SourceDef sourceDef =
+                new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", 
sourceConfig);
+
+        // Setup value sink
+        Configuration sinkConfig = new Configuration();
+        sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+        sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
+        SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
+
+        // Setup transform
+        TransformDef transformDef =
+                new TransformDef(
+                        "default_namespace.default_schema.table1",
+                        "*, get_char() AS char_col, get_varchar() AS 
varchar_col, get_binary() AS binary_col, get_varbinary() AS varbinary_col, 
get_ts() AS ts_col, get_ts_ltz() AS ts_ltz_col, get_decimal() AS decimal_col, 
get_non_null() AS non_null_col",
+                        null,
+                        "col1",
+                        null,
+                        "key1=value1",
+                        "",
+                        null);
+
+        // Setup pipeline
+        Configuration pipelineConfig = new Configuration();
+        pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE, 
"America/Los_Angeles");
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
+        PipelineDef pipelineDef =
+                new PipelineDef(
+                        sourceDef,
+                        sinkDef,
+                        Collections.emptyList(),
+                        Collections.singletonList(transformDef),
+                        Arrays.asList(
+                                new UdfDef(
+                                        "get_char",
+                                        String.format(
+                                                
"org.apache.flink.cdc.udf.examples.%s.precision.CharTypeReturningClass",
+                                                language)),
+                                new UdfDef(
+                                        "get_varchar",
+                                        String.format(
+                                                
"org.apache.flink.cdc.udf.examples.%s.precision.VarCharTypeReturningClass",
+                                                language)),
+                                new UdfDef(
+                                        "get_binary",
+                                        String.format(
+                                                
"org.apache.flink.cdc.udf.examples.%s.precision.BinaryTypeReturningClass",
+                                                language)),
+                                new UdfDef(
+                                        "get_varbinary",
+                                        String.format(
+                                                
"org.apache.flink.cdc.udf.examples.%s.precision.VarBinaryTypeReturningClass",
+                                                language)),
+                                new UdfDef(
+                                        "get_ts",
+                                        String.format(
+                                                
"org.apache.flink.cdc.udf.examples.%s.precision.TimestampTypeReturningClass",
+                                                language)),
+                                new UdfDef(
+                                        "get_ts_ltz",
+                                        String.format(
+                                                
"org.apache.flink.cdc.udf.examples.%s.precision.LocalZonedTimestampTypeReturningClass",
+                                                language)),
+                                new UdfDef(
+                                        "get_decimal",
+                                        String.format(
+                                                
"org.apache.flink.cdc.udf.examples.%s.precision.DecimalTypeReturningClass",
+                                                language)),
+                                new UdfDef(
+                                        "get_non_null",
+                                        String.format(
+                                                
"org.apache.flink.cdc.udf.examples.%s.precision.DecimalTypeNonNullReturningClass",
+                                                language))),
+                        pipelineConfig);
+
+        // Execute the pipeline
+        PipelineExecution execution = composer.compose(pipelineDef);
+        execution.execute();
+
+        // Check the order and content of all received events
+        String[] outputEvents = outCaptor.toString().trim().split("\n");
+        assertThat(outputEvents)
+                .contains(
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING NOT NULL,`col2` STRING,`char_col` 
STRING,`varchar_col` STRING,`binary_col` BINARY(17),`varbinary_col` 
VARBINARY(17),`ts_col` TIMESTAMP(2),`ts_ltz_col` TIMESTAMP_LTZ(2),`decimal_col` 
DECIMAL(10, 3),`non_null_col` DECIMAL(10, 3)}, primaryKeys=col1, 
options=({key1=value1})}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[1, 1, This is a string., This is a string., eHl6enk=, eHl6enk=, 
1970-01-02T00:00, 1970-01-02T00:00, 12.315, 12.315], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[2, 2, This is a string., This is a string., eHl6enk=, eHl6enk=, 
1970-01-02T00:00, 1970-01-02T00:00, 12.315, 12.315], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[3, 3, This is a string., This is a string., eHl6enk=, eHl6enk=, 
1970-01-02T00:00, 1970-01-02T00:00, 12.315, 12.315], op=INSERT, meta=()}",
+                        
"AddColumnEvent{tableId=default_namespace.default_schema.table1, 
addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, 
existedColumnName=col2}]}",
+                        
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, 
nameMapping={col2=newCol2, col3=newCol3}}",
+                        
"DropColumnEvent{tableId=default_namespace.default_schema.table1, 
droppedColumnNames=[newCol2]}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 
This is a string., This is a string., eHl6enk=, eHl6enk=, 1970-01-02T00:00, 
1970-01-02T00:00, 12.315, 12.315], after=[], op=DELETE, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 
This is a string., This is a string., eHl6enk=, eHl6enk=, 1970-01-02T00:00, 
1970-01-02T00:00, 12.315, 12.315], after=[2, x, This is a string., This is a 
string., eHl6enk=, eHl6enk=, 1970-01-02T00:00, 1970-01-02T00:00, 12.315, 
12.315], op=UPDATE, meta=()}");
+    }
+
     private static Stream<Arguments> testParams() {
         return Stream.of(
                 arguments(ValuesDataSink.SinkApi.SINK_FUNCTION, "java"),
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkHelper.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkHelper.java
index 1d93899f9..d1941c3eb 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkHelper.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkHelper.java
@@ -67,10 +67,13 @@ public class ValuesDataSinkHelper {
         return fields.stream()
                 .map(
                         o -> {
+                            if (o == null) {
+                                return "null";
+                            }
                             if (o instanceof byte[]) {
                                 return BaseEncoding.base64().encode((byte[]) 
o);
                             } else {
-                                return o;
+                                return o.toString();
                             }
                         })
                 .collect(Collectors.toList());
diff --git 
a/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/BinaryTypeReturningClass.java
 
b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/BinaryTypeReturningClass.java
new file mode 100644
index 000000000..b1ab5e6ae
--- /dev/null
+++ 
b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/BinaryTypeReturningClass.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.udf.examples.java.precision;
+
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.udf.UserDefinedFunction;
+
+/** This is an example UDF class for testing purposes only. */
+public class BinaryTypeReturningClass implements UserDefinedFunction {
+    @Override
+    public DataType getReturnType() {
+        return DataTypes.BINARY(17);
+    }
+
+    public byte[] eval() {
+        return "xyzzy".getBytes();
+    }
+}
diff --git 
a/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/CharTypeReturningClass.java
 
b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/CharTypeReturningClass.java
new file mode 100644
index 000000000..df3994899
--- /dev/null
+++ 
b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/CharTypeReturningClass.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.udf.examples.java.precision;
+
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.udf.UserDefinedFunction;
+
+/** This is an example UDF class for testing purposes only. */
+public class CharTypeReturningClass implements UserDefinedFunction {
+
+    @Override
+    public DataType getReturnType() {
+        return DataTypes.CHAR(17);
+    }
+
+    public String eval() {
+        return "This is a string.";
+    }
+}
diff --git 
a/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/DecimalTypeNonNullReturningClass.java
 
b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/DecimalTypeNonNullReturningClass.java
new file mode 100644
index 000000000..6b27ef24e
--- /dev/null
+++ 
b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/DecimalTypeNonNullReturningClass.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.udf.examples.java.precision;
+
+import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.udf.UserDefinedFunction;
+
+import java.math.BigDecimal;
+
+/** This is an example UDF class for testing purposes only. */
+public class DecimalTypeNonNullReturningClass implements UserDefinedFunction {
+    @Override
+    public DataType getReturnType() {
+        return DataTypes.DECIMAL(10, 3).notNull();
+    }
+
+    public DecimalData eval() {
+        return DecimalData.fromBigDecimal(new BigDecimal("12.315"), 10, 3);
+    }
+}
diff --git 
a/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/DecimalTypeReturningClass.java
 
b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/DecimalTypeReturningClass.java
new file mode 100644
index 000000000..8e5bcd39a
--- /dev/null
+++ 
b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/DecimalTypeReturningClass.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.udf.examples.java.precision;
+
+import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.udf.UserDefinedFunction;
+
+import java.math.BigDecimal;
+
+/** This is an example UDF class for testing purposes only. */
+public class DecimalTypeReturningClass implements UserDefinedFunction {
+    @Override
+    public DataType getReturnType() {
+        return DataTypes.DECIMAL(10, 3);
+    }
+
+    public DecimalData eval() {
+        return DecimalData.fromBigDecimal(new BigDecimal("12.315"), 10, 3);
+    }
+}
diff --git 
a/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/LocalZonedTimestampTypeReturningClass.java
 
b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/LocalZonedTimestampTypeReturningClass.java
new file mode 100644
index 000000000..9231c12e2
--- /dev/null
+++ 
b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/LocalZonedTimestampTypeReturningClass.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.udf.examples.java.precision;
+
+import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.udf.UserDefinedFunction;
+
+/** This is an example UDF class for testing purposes only. */
+public class LocalZonedTimestampTypeReturningClass implements 
UserDefinedFunction {
+    @Override
+    public DataType getReturnType() {
+        return DataTypes.TIMESTAMP_LTZ(2);
+    }
+
+    public LocalZonedTimestampData eval() {
+        return LocalZonedTimestampData.fromEpochMillis(24 * 60 * 60 * 1000);
+    }
+}
diff --git 
a/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/TimestampTypeReturningClass.java
 
b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/TimestampTypeReturningClass.java
new file mode 100644
index 000000000..22826c2e8
--- /dev/null
+++ 
b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/TimestampTypeReturningClass.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.udf.examples.java.precision;
+
+import org.apache.flink.cdc.common.data.TimestampData;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.udf.UserDefinedFunction;
+
+/** This is an example UDF class for testing purposes only. */
+public class TimestampTypeReturningClass implements UserDefinedFunction {
+    @Override
+    public DataType getReturnType() {
+        return DataTypes.TIMESTAMP(2);
+    }
+
+    public TimestampData eval() {
+        return TimestampData.fromMillis(24 * 60 * 60 * 1000);
+    }
+}
diff --git 
a/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/VarBinaryTypeReturningClass.java
 
b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/VarBinaryTypeReturningClass.java
new file mode 100644
index 000000000..fdb3d54aa
--- /dev/null
+++ 
b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/VarBinaryTypeReturningClass.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.udf.examples.java.precision;
+
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.udf.UserDefinedFunction;
+
+/** This is an example UDF class for testing purposes only. */
+public class VarBinaryTypeReturningClass implements UserDefinedFunction {
+    @Override
+    public DataType getReturnType() {
+        return DataTypes.VARBINARY(17);
+    }
+
+    public byte[] eval() {
+        return "xyzzy".getBytes();
+    }
+}
diff --git 
a/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/VarCharTypeReturningClass.java
 
b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/VarCharTypeReturningClass.java
new file mode 100644
index 000000000..0e15fe80d
--- /dev/null
+++ 
b/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/java/precision/VarCharTypeReturningClass.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.udf.examples.java.precision;
+
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.udf.UserDefinedFunction;
+
+/** This is an example UDF class for testing purposes only. */
+public class VarCharTypeReturningClass implements UserDefinedFunction {
+    @Override
+    public DataType getReturnType() {
+        return DataTypes.VARCHAR(17);
+    }
+
+    public String eval() {
+        return "This is a string.";
+    }
+}
diff --git 
a/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/BinaryTypeReturningClass.scala
 
b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/BinaryTypeReturningClass.scala
new file mode 100644
index 000000000..6f76d0899
--- /dev/null
+++ 
b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/BinaryTypeReturningClass.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.udf.examples.scala.precision
+
+import org.apache.flink.cdc.common.types.DataType
+import org.apache.flink.cdc.common.types.DataTypes
+import org.apache.flink.cdc.common.udf.UserDefinedFunction
+
+/** This is an example UDF class for testing purposes only. */
+class BinaryTypeReturningClass extends UserDefinedFunction {
+  override def getReturnType: DataType = DataTypes.BINARY(17)
+  def eval: Array[Byte] = "xyzzy".getBytes
+}
diff --git 
a/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/CharTypeReturningClass.scala
 
b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/CharTypeReturningClass.scala
new file mode 100644
index 000000000..dcb73862c
--- /dev/null
+++ 
b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/CharTypeReturningClass.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.udf.examples.scala.precision
+
+import org.apache.flink.cdc.common.types.DataType
+import org.apache.flink.cdc.common.types.DataTypes
+import org.apache.flink.cdc.common.udf.UserDefinedFunction
+
+/** This is an example UDF class for testing purposes only. */
+class CharTypeReturningClass extends UserDefinedFunction {
+  override def getReturnType: DataType = DataTypes.CHAR(17)
+  def eval = "This is a string."
+}
diff --git 
a/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/DecimalTypeNonNullReturningClass.scala
 
b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/DecimalTypeNonNullReturningClass.scala
new file mode 100644
index 000000000..2940ce200
--- /dev/null
+++ 
b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/DecimalTypeNonNullReturningClass.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.udf.examples.scala.precision
+
+import org.apache.flink.cdc.common.data.DecimalData
+import org.apache.flink.cdc.common.types.{DataType, DataTypes}
+import org.apache.flink.cdc.common.udf.UserDefinedFunction
+
+import java.math.BigDecimal
+
+/** This is an example UDF class for testing purposes only. */
+class DecimalTypeNonNullReturningClass extends UserDefinedFunction {
+  override def getReturnType: DataType = DataTypes.DECIMAL(10, 3).notNull()
+  def eval: DecimalData = DecimalData.fromBigDecimal(new BigDecimal("12.315"), 
10, 3)
+}
diff --git 
a/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/DecimalTypeReturningClass.scala
 
b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/DecimalTypeReturningClass.scala
new file mode 100644
index 000000000..46cbe0215
--- /dev/null
+++ 
b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/DecimalTypeReturningClass.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.udf.examples.scala.precision
+
+import org.apache.flink.cdc.common.data.DecimalData
+import org.apache.flink.cdc.common.types.DataType
+import org.apache.flink.cdc.common.types.DataTypes
+import org.apache.flink.cdc.common.udf.UserDefinedFunction
+
+import java.math.BigDecimal
+
+/** This is an example UDF class for testing purposes only. */
+class DecimalTypeReturningClass extends UserDefinedFunction {
+  override def getReturnType: DataType = DataTypes.DECIMAL(10, 3)
+  def eval: DecimalData = DecimalData.fromBigDecimal(new BigDecimal("12.315"), 
10, 3)
+}
diff --git 
a/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/LocalZonedTimestampTypeReturningClass.scala
 
b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/LocalZonedTimestampTypeReturningClass.scala
new file mode 100644
index 000000000..dba554158
--- /dev/null
+++ 
b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/LocalZonedTimestampTypeReturningClass.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.udf.examples.scala.precision
+
+import org.apache.flink.cdc.common.data.LocalZonedTimestampData
+import org.apache.flink.cdc.common.types.DataType
+import org.apache.flink.cdc.common.types.DataTypes
+import org.apache.flink.cdc.common.udf.UserDefinedFunction
+
+/** This is an example UDF class for testing purposes only. */
+class LocalZonedTimestampTypeReturningClass extends UserDefinedFunction {
+  override def getReturnType: DataType = DataTypes.TIMESTAMP_LTZ(2)
+  def eval: LocalZonedTimestampData = 
LocalZonedTimestampData.fromEpochMillis(24 * 60 * 60 * 1000)
+}
diff --git 
a/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/TimestampTypeReturningClass.scala
 
b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/TimestampTypeReturningClass.scala
new file mode 100644
index 000000000..958f23e6d
--- /dev/null
+++ 
b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/TimestampTypeReturningClass.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.udf.examples.scala.precision
+
+import org.apache.flink.cdc.common.data.TimestampData
+import org.apache.flink.cdc.common.types.DataType
+import org.apache.flink.cdc.common.types.DataTypes
+import org.apache.flink.cdc.common.udf.UserDefinedFunction
+
+/** This is an example UDF class for testing purposes only. */
+class TimestampTypeReturningClass extends UserDefinedFunction {
+  override def getReturnType: DataType = DataTypes.TIMESTAMP(2)
+  def eval: TimestampData = TimestampData.fromMillis(24 * 60 * 60 * 1000)
+}
diff --git 
a/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/VarBinaryTypeReturningClass.scala
 
b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/VarBinaryTypeReturningClass.scala
new file mode 100644
index 000000000..1f9a376d7
--- /dev/null
+++ 
b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/VarBinaryTypeReturningClass.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.udf.examples.scala.precision
+
+import org.apache.flink.cdc.common.types.DataType
+import org.apache.flink.cdc.common.types.DataTypes
+import org.apache.flink.cdc.common.udf.UserDefinedFunction
+
+/** This is an example UDF class for testing purposes only. */
+class VarBinaryTypeReturningClass extends UserDefinedFunction {
+  override def getReturnType: DataType = DataTypes.VARBINARY(17)
+  def eval: Array[Byte] = "xyzzy".getBytes
+}
diff --git 
a/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/VarCharTypeReturningClass.scala
 
b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/VarCharTypeReturningClass.scala
new file mode 100644
index 000000000..ad8baaa29
--- /dev/null
+++ 
b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/precision/VarCharTypeReturningClass.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.udf.examples.scala.precision
+
+import org.apache.flink.cdc.common.types.DataType
+import org.apache.flink.cdc.common.types.DataTypes
+import org.apache.flink.cdc.common.udf.UserDefinedFunction
+
+/** This is an example UDF class for testing purposes only. */
+class VarCharTypeReturningClass extends UserDefinedFunction {
+  override def getReturnType: DataType = DataTypes.VARCHAR(17)
+  def eval = "This is a string."
+}
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
index 4a2674478..0a062d3e8 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.cdc.runtime.functions;
 
+import org.apache.flink.cdc.common.data.DecimalData;
 import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
 import org.apache.flink.cdc.common.data.TimestampData;
 import org.apache.flink.cdc.common.data.ZonedTimestampData;
@@ -258,6 +259,14 @@ public class SystemFunctionUtils {
         return value.compareTo(minValue) >= 0 && value.compareTo(maxValue) <= 
0;
     }
 
+    public static boolean betweenAsymmetric(
+            DecimalData value, DecimalData minValue, DecimalData maxValue) {
+        if (value == null) {
+            return false;
+        }
+        return value.compareTo(minValue) >= 0 && value.compareTo(maxValue) <= 
0;
+    }
+
     public static boolean notBetweenAsymmetric(String value, String minValue, 
String maxValue) {
         return !betweenAsymmetric(value, minValue, maxValue);
     }
@@ -287,6 +296,11 @@ public class SystemFunctionUtils {
         return !betweenAsymmetric(value, minValue, maxValue);
     }
 
+    public static boolean notBetweenAsymmetric(
+            DecimalData value, DecimalData minValue, DecimalData maxValue) {
+        return !betweenAsymmetric(value, minValue, maxValue);
+    }
+
     public static boolean in(String value, String... str) {
         return Arrays.stream(str).anyMatch(item -> value.equals(item));
     }
@@ -315,6 +329,10 @@ public class SystemFunctionUtils {
         return Arrays.stream(values).anyMatch(item -> value.equals(item));
     }
 
+    public static boolean in(DecimalData value, DecimalData... values) {
+        return Arrays.stream(values).anyMatch(item -> value.equals(item));
+    }
+
     public static boolean notIn(String value, String... values) {
         return !in(value, values);
     }
@@ -343,6 +361,10 @@ public class SystemFunctionUtils {
         return !in(value, values);
     }
 
+    public static boolean notIn(DecimalData value, DecimalData... values) {
+        return !in(value, values);
+    }
+
     public static int charLength(String str) {
         return str.length();
     }
@@ -577,11 +599,27 @@ public class SystemFunctionUtils {
         return round(b0, 0);
     }
 
+    /** SQL <code>ROUND</code> operator applied to BigDecimal values. */
+    public static DecimalData round(DecimalData b0) {
+        return round(b0, 0);
+    }
+
     /** SQL <code>ROUND</code> operator applied to BigDecimal values. */
     public static BigDecimal round(BigDecimal b0, int b1) {
         return b0.movePointRight(b1).setScale(0, 
RoundingMode.HALF_UP).movePointLeft(b1);
     }
 
+    /** SQL <code>ROUND</code> operator applied to DecimalData values. */
+    public static DecimalData round(DecimalData b0, int b1) {
+        return DecimalData.fromBigDecimal(
+                b0.toBigDecimal()
+                        .movePointRight(b1)
+                        .setScale(0, RoundingMode.HALF_UP)
+                        .movePointLeft(b1),
+                b0.precision(),
+                b0.scale());
+    }
+
     /** SQL <code>ROUND</code> operator applied to float values. */
     public static float round(float b0) {
         return round(b0, 0);
@@ -649,6 +687,8 @@ public class SystemFunctionUtils {
             return !object.equals(0d);
         } else if (object instanceof BigDecimal) {
             return ((BigDecimal) object).compareTo(BigDecimal.ZERO) != 0;
+        } else if (object instanceof DecimalData) {
+            return ((DecimalData) object).compareTo(DecimalData.zero(1, 0)) != 
0;
         }
         return Boolean.valueOf(castToString(object));
     }
@@ -663,6 +703,9 @@ public class SystemFunctionUtils {
         if (object instanceof BigDecimal) {
             return ((BigDecimal) object).byteValue();
         }
+        if (object instanceof DecimalData) {
+            return ((DecimalData) object).toBigDecimal().byteValue();
+        }
         if (object instanceof Double) {
             return ((Double) object).byteValue();
         }
@@ -693,6 +736,9 @@ public class SystemFunctionUtils {
         if (object instanceof BigDecimal) {
             return ((BigDecimal) object).shortValue();
         }
+        if (object instanceof DecimalData) {
+            return ((DecimalData) object).toBigDecimal().shortValue();
+        }
         if (object instanceof Double) {
             return ((Double) object).shortValue();
         }
@@ -723,6 +769,9 @@ public class SystemFunctionUtils {
         if (object instanceof BigDecimal) {
             return ((BigDecimal) object).intValue();
         }
+        if (object instanceof DecimalData) {
+            return ((DecimalData) object).toBigDecimal().intValue();
+        }
         if (object instanceof Double) {
             return ((Double) object).intValue();
         }
@@ -753,6 +802,9 @@ public class SystemFunctionUtils {
         if (object instanceof BigDecimal) {
             return ((BigDecimal) object).longValue();
         }
+        if (object instanceof DecimalData) {
+            return ((DecimalData) object).toBigDecimal().longValue();
+        }
         if (object instanceof Double) {
             return ((Double) object).longValue();
         }
@@ -783,6 +835,9 @@ public class SystemFunctionUtils {
         if (object instanceof BigDecimal) {
             return ((BigDecimal) object).floatValue();
         }
+        if (object instanceof DecimalData) {
+            return ((DecimalData) object).toBigDecimal().floatValue();
+        }
         if (object instanceof Double) {
             return ((Double) object).floatValue();
         }
@@ -806,6 +861,9 @@ public class SystemFunctionUtils {
         if (object instanceof BigDecimal) {
             return ((BigDecimal) object).doubleValue();
         }
+        if (object instanceof DecimalData) {
+            return ((DecimalData) object).toBigDecimal().doubleValue();
+        }
         if (object instanceof Double) {
             return (Double) object;
         }
@@ -843,6 +901,30 @@ public class SystemFunctionUtils {
         return bigDecimal;
     }
 
+    public static DecimalData castToDecimalData(Object object, int precision, 
int scale) {
+        if (object == null) {
+            return null;
+        }
+        if (object instanceof Boolean) {
+            object = (Boolean) object ? 1 : 0;
+        }
+
+        BigDecimal bigDecimal;
+        try {
+            bigDecimal = new BigDecimal(castObjectIntoString(object), new 
MathContext(precision));
+            bigDecimal = bigDecimal.setScale(scale, RoundingMode.HALF_UP);
+        } catch (NumberFormatException ignored) {
+            return null;
+        }
+
+        // If the precision overflows, null will be returned. Otherwise, we 
may accidentally emit a
+        // non-serializable object into the pipeline that breaks downstream.
+        if (bigDecimal.precision() > precision) {
+            return null;
+        }
+        return DecimalData.fromBigDecimal(bigDecimal, precision, scale);
+    }
+
     public static TimestampData castToTimestamp(Object object, String 
timezone) {
         if (object == null) {
             return null;
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
index b122aded8..14cb79026 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
@@ -470,7 +470,7 @@ public class JaninoCompiler {
                 return new Java.MethodInvocation(
                         Location.NOWHERE,
                         null,
-                        "castToBigDecimal",
+                        "castToDecimalData",
                         newAtoms.toArray(new Java.Rvalue[0]));
             case "CHAR":
             case "VARCHAR":
@@ -496,7 +496,7 @@ public class JaninoCompiler {
             return String.format(
                     "(%s) __instanceOf%s.eval",
                     
DataTypeConverter.convertOriginalClass(udfFunction.getReturnTypeHint())
-                            .getName(),
+                            .getCanonicalName(),
                     udfFunction.getClassName());
         } else {
             return String.format("__instanceOf%s.eval", 
udfFunction.getClassName());
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
index 4dfcd09a5..5c8417633 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
@@ -176,8 +176,8 @@ public class TransformParser {
                                 new HepPlanner(new 
HepProgramBuilder().build()),
                                 new RexBuilder(factory)),
                         StandardConvertletTable.INSTANCE,
-                        
SqlToRelConverter.config().withTrimUnusedFields(false));
-        RelRoot relRoot = sqlToRelConverter.convertQuery(validateSqlNode, 
false, true);
+                        SqlToRelConverter.config().withTrimUnusedFields(true));
+        RelRoot relRoot = sqlToRelConverter.convertQuery(validateSqlNode, 
false, false);
         return relRoot.rel;
     }
 
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformTable.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformTable.java
index 814ed74e4..34fcc8f57 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformTable.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformTable.java
@@ -23,9 +23,7 @@ import 
org.apache.flink.cdc.runtime.typeutils.DataTypeConverter;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.schema.impl.AbstractTable;
-import org.apache.calcite.util.Pair;
 
-import java.util.ArrayList;
 import java.util.List;
 
 /** TransformTable to generate the metadata of calcite. */
@@ -46,14 +44,6 @@ public class TransformTable extends AbstractTable {
 
     @Override
     public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
-        List<String> names = new ArrayList<>();
-        List<RelDataType> types = new ArrayList<>();
-        for (Column column : columns) {
-            names.add(column.getName());
-            RelDataType sqlType =
-                    DataTypeConverter.convertCalciteType(relDataTypeFactory, 
column.getType());
-            types.add(sqlType);
-        }
-        return relDataTypeFactory.createStructType(Pair.zip(names, types));
+        return DataTypeConverter.convertCalciteRelDataType(relDataTypeFactory, 
columns);
     }
 }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/DataTypeConverter.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/DataTypeConverter.java
index 8da220293..cd2542e7e 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/DataTypeConverter.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/DataTypeConverter.java
@@ -27,14 +27,27 @@ import org.apache.flink.cdc.common.data.TimestampData;
 import org.apache.flink.cdc.common.data.binary.BinaryStringData;
 import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.types.ArrayType;
+import org.apache.flink.cdc.common.types.BigIntType;
 import org.apache.flink.cdc.common.types.BinaryType;
+import org.apache.flink.cdc.common.types.BooleanType;
+import org.apache.flink.cdc.common.types.CharType;
 import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.DateType;
 import org.apache.flink.cdc.common.types.DecimalType;
+import org.apache.flink.cdc.common.types.DoubleType;
+import org.apache.flink.cdc.common.types.FloatType;
+import org.apache.flink.cdc.common.types.IntType;
+import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
 import org.apache.flink.cdc.common.types.MapType;
 import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.common.types.SmallIntType;
+import org.apache.flink.cdc.common.types.TimeType;
 import org.apache.flink.cdc.common.types.TimestampType;
+import org.apache.flink.cdc.common.types.TinyIntType;
 import org.apache.flink.cdc.common.types.VarBinaryType;
+import org.apache.flink.cdc.common.types.VarCharType;
+import org.apache.flink.cdc.common.types.ZonedTimestampType;
 
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -98,7 +111,7 @@ public class DataTypeConverter {
             case VARBINARY:
                 return byte[].class;
             case DECIMAL:
-                return BigDecimal.class;
+                return DecimalData.class;
             case ROW:
                 return Object.class;
             case ARRAY:
@@ -110,6 +123,172 @@ public class DataTypeConverter {
         }
     }
 
+    public static RelDataType convertCalciteRelDataType(
+            RelDataTypeFactory typeFactory, List<Column> columns) {
+        RelDataTypeFactory.Builder fieldInfoBuilder = typeFactory.builder();
+        for (Column column : columns) {
+            switch (column.getType().getTypeRoot()) {
+                case BOOLEAN:
+                    BooleanType booleanType = (BooleanType) column.getType();
+                    fieldInfoBuilder
+                            .add(column.getName(), SqlTypeName.BOOLEAN)
+                            .nullable(booleanType.isNullable());
+                    break;
+                case TINYINT:
+                    TinyIntType tinyIntType = (TinyIntType) column.getType();
+                    fieldInfoBuilder
+                            .add(column.getName(), SqlTypeName.TINYINT)
+                            .nullable(tinyIntType.isNullable());
+                    break;
+                case SMALLINT:
+                    SmallIntType smallIntType = (SmallIntType) 
column.getType();
+                    fieldInfoBuilder
+                            .add(column.getName(), SqlTypeName.SMALLINT)
+                            .nullable(smallIntType.isNullable());
+                    break;
+                case INTEGER:
+                    IntType intType = (IntType) column.getType();
+                    fieldInfoBuilder
+                            .add(column.getName(), SqlTypeName.INTEGER)
+                            .nullable(intType.isNullable());
+                    break;
+                case BIGINT:
+                    BigIntType bigIntType = (BigIntType) column.getType();
+                    fieldInfoBuilder
+                            .add(column.getName(), SqlTypeName.BIGINT)
+                            .nullable(bigIntType.isNullable());
+                    break;
+                case DATE:
+                    DateType dataType = (DateType) column.getType();
+                    fieldInfoBuilder
+                            .add(column.getName(), SqlTypeName.DATE)
+                            .nullable(dataType.isNullable());
+                    break;
+                case TIME_WITHOUT_TIME_ZONE:
+                    TimeType timeType = (TimeType) column.getType();
+                    fieldInfoBuilder
+                            .add(
+                                    column.getName(),
+                                    SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE,
+                                    timeType.getPrecision())
+                            .nullable(timeType.isNullable());
+                    break;
+                case TIMESTAMP_WITHOUT_TIME_ZONE:
+                    TimestampType timestampType = (TimestampType) 
column.getType();
+                    fieldInfoBuilder
+                            .add(
+                                    column.getName(),
+                                    SqlTypeName.TIMESTAMP,
+                                    timestampType.getPrecision())
+                            .nullable(timestampType.isNullable());
+                    break;
+                case TIMESTAMP_WITH_TIME_ZONE:
+                    ZonedTimestampType zonedTimestampType = 
(ZonedTimestampType) column.getType();
+                    fieldInfoBuilder
+                            .add(
+                                    column.getName(),
+                                    SqlTypeName.TIMESTAMP,
+                                    zonedTimestampType.getPrecision())
+                            .nullable(zonedTimestampType.isNullable());
+                    break;
+                case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                    LocalZonedTimestampType localZonedTimestampType =
+                            (LocalZonedTimestampType) column.getType();
+                    fieldInfoBuilder
+                            .add(
+                                    column.getName(),
+                                    SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
+                                    localZonedTimestampType.getPrecision())
+                            .nullable(localZonedTimestampType.isNullable());
+                    break;
+                case FLOAT:
+                    FloatType floatType = (FloatType) column.getType();
+                    fieldInfoBuilder
+                            .add(column.getName(), SqlTypeName.FLOAT)
+                            .nullable(floatType.isNullable());
+                    break;
+                case DOUBLE:
+                    DoubleType doubleType = (DoubleType) column.getType();
+                    fieldInfoBuilder
+                            .add(column.getName(), SqlTypeName.DOUBLE)
+                            .nullable(doubleType.isNullable());
+                    break;
+                case CHAR:
+                    CharType charType = (CharType) column.getType();
+                    fieldInfoBuilder
+                            .add(column.getName(), SqlTypeName.CHAR, 
charType.getLength())
+                            .nullable(charType.isNullable());
+                    break;
+                case VARCHAR:
+                    VarCharType varCharType = (VarCharType) column.getType();
+                    fieldInfoBuilder
+                            .add(column.getName(), SqlTypeName.VARCHAR, 
varCharType.getLength())
+                            .nullable(varCharType.isNullable());
+                    break;
+                case BINARY:
+                    BinaryType binaryType = (BinaryType) column.getType();
+                    fieldInfoBuilder
+                            .add(column.getName(), SqlTypeName.BINARY, 
binaryType.getLength())
+                            .nullable(binaryType.isNullable());
+                    break;
+                case VARBINARY:
+                    VarBinaryType varBinaryType = (VarBinaryType) 
column.getType();
+                    fieldInfoBuilder
+                            .add(column.getName(), SqlTypeName.VARBINARY, 
varBinaryType.getLength())
+                            .nullable(varBinaryType.isNullable());
+                    break;
+                case DECIMAL:
+                    DecimalType decimalType = (DecimalType) column.getType();
+                    fieldInfoBuilder
+                            .add(
+                                    column.getName(),
+                                    SqlTypeName.DECIMAL,
+                                    decimalType.getPrecision(),
+                                    decimalType.getScale())
+                            .nullable(decimalType.isNullable());
+                    break;
+                case ROW:
+                    List<RelDataType> dataTypes =
+                            ((RowType) column.getType())
+                                    .getFieldTypes().stream()
+                                            .map((type) -> 
convertCalciteType(typeFactory, type))
+                                            .collect(Collectors.toList());
+                    fieldInfoBuilder
+                            .add(
+                                    column.getName(),
+                                    typeFactory.createStructType(
+                                            dataTypes,
+                                            ((RowType) 
column.getType()).getFieldNames()))
+                            .nullable(true);
+                    break;
+                case ARRAY:
+                    DataType elementType = ((ArrayType) 
column.getType()).getElementType();
+                    fieldInfoBuilder
+                            .add(
+                                    column.getName(),
+                                    typeFactory.createArrayType(
+                                            convertCalciteType(typeFactory, 
elementType), -1))
+                            .nullable(true);
+                    break;
+                case MAP:
+                    RelDataType keyType =
+                            convertCalciteType(
+                                    typeFactory, ((MapType) 
column.getType()).getKeyType());
+                    RelDataType valueType =
+                            convertCalciteType(
+                                    typeFactory, ((MapType) 
column.getType()).getValueType());
+                    fieldInfoBuilder
+                            .add(column.getName(), 
typeFactory.createMapType(keyType, valueType))
+                            .nullable(true);
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            "Unsupported type: " + column.getType());
+            }
+        }
+        return fieldInfoBuilder.build();
+    }
+
     public static RelDataType convertCalciteType(
             RelDataTypeFactory typeFactory, DataType dataType) {
         switch (dataType.getTypeRoot()) {
@@ -126,25 +305,43 @@ public class DataTypeConverter {
             case DATE:
                 return typeFactory.createSqlType(SqlTypeName.DATE);
             case TIME_WITHOUT_TIME_ZONE:
-                return 
typeFactory.createSqlType(SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE);
+                TimeType timeType = (TimeType) dataType;
+                return typeFactory.createSqlType(
+                        SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE, 
timeType.getPrecision());
             case TIMESTAMP_WITHOUT_TIME_ZONE:
-                return typeFactory.createSqlType(SqlTypeName.TIMESTAMP);
+                TimestampType timestampType = (TimestampType) dataType;
+                return typeFactory.createSqlType(
+                        SqlTypeName.TIMESTAMP, timestampType.getPrecision());
+            case TIMESTAMP_WITH_TIME_ZONE:
+                ZonedTimestampType zonedTimestampType = (ZonedTimestampType) 
dataType;
+                return typeFactory.createSqlType(
+                        SqlTypeName.TIMESTAMP, 
zonedTimestampType.getPrecision());
             case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                return 
typeFactory.createSqlType(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+                LocalZonedTimestampType localZonedTimestampType =
+                        (LocalZonedTimestampType) dataType;
+                return typeFactory.createSqlType(
+                        SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
+                        localZonedTimestampType.getPrecision());
             case FLOAT:
                 return typeFactory.createSqlType(SqlTypeName.FLOAT);
             case DOUBLE:
                 return typeFactory.createSqlType(SqlTypeName.DOUBLE);
             case CHAR:
-                return typeFactory.createSqlType(SqlTypeName.CHAR);
+                CharType charType = (CharType) dataType;
+                return typeFactory.createSqlType(SqlTypeName.CHAR, 
charType.getLength());
             case VARCHAR:
-                return typeFactory.createSqlType(SqlTypeName.VARCHAR);
+                VarCharType varCharType = (VarCharType) dataType;
+                return typeFactory.createSqlType(SqlTypeName.VARCHAR, 
varCharType.getLength());
             case BINARY:
-                return typeFactory.createSqlType(SqlTypeName.BINARY);
+                BinaryType binaryType = (BinaryType) dataType;
+                return typeFactory.createSqlType(SqlTypeName.BINARY, 
binaryType.getLength());
             case VARBINARY:
-                return typeFactory.createSqlType(SqlTypeName.VARBINARY);
+                VarBinaryType varBinaryType = (VarBinaryType) dataType;
+                return typeFactory.createSqlType(SqlTypeName.VARBINARY, 
varBinaryType.getLength());
             case DECIMAL:
-                return typeFactory.createSqlType(SqlTypeName.DECIMAL);
+                DecimalType decimalType = (DecimalType) dataType;
+                return typeFactory.createSqlType(
+                        SqlTypeName.DECIMAL, decimalType.getPrecision(), 
decimalType.getScale());
             case ROW:
                 List<RelDataType> dataTypes =
                         ((RowType) dataType)
@@ -197,9 +394,9 @@ public class DataTypeConverter {
             case VARCHAR:
                 return DataTypes.STRING();
             case BINARY:
-                return DataTypes.BINARY(BinaryType.MAX_LENGTH);
+                return DataTypes.BINARY(relDataType.getPrecision());
             case VARBINARY:
-                return DataTypes.VARBINARY(VarBinaryType.MAX_LENGTH);
+                return DataTypes.VARBINARY(relDataType.getPrecision());
             case DECIMAL:
                 return DataTypes.DECIMAL(relDataType.getPrecision(), 
relDataType.getScale());
             case ARRAY:
@@ -298,7 +495,7 @@ public class DataTypeConverter {
             case VARBINARY:
                 return convertToBinary(value);
             case DECIMAL:
-                return convertToDecimalOriginal(value);
+                return convertToDecimal(value);
             case ROW:
                 return value;
             case ARRAY:
@@ -620,6 +817,7 @@ public class DataTypeConverter {
         }
     }
 
+    // convert to DecimalData
     private static Object convertToDecimal(Object obj) {
         if (obj instanceof BigDecimal) {
             BigDecimal bigDecimalValue = (BigDecimal) obj;
@@ -632,16 +830,4 @@ public class DataTypeConverter {
                     "Unsupported Decimal value type: " + 
obj.getClass().getSimpleName());
         }
     }
-
-    private static Object convertToDecimalOriginal(Object obj) {
-        if (obj instanceof BigDecimal) {
-            return obj;
-        } else if (obj instanceof DecimalData) {
-            DecimalData decimalData = (DecimalData) obj;
-            return decimalData.toBigDecimal();
-        } else {
-            throw new UnsupportedOperationException(
-                    "Unsupported Decimal value type: " + 
obj.getClass().getSimpleName());
-        }
-    }
 }
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
index 3c5aae6db..fc5dfd447 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
@@ -303,7 +303,7 @@ public class TransformParserTest {
         testFilterExpression("cast(1 as bigint)", "castToLong(1)");
         testFilterExpression("cast(1 as float)", "castToFloat(1)");
         testFilterExpression("cast(1 as double)", "castToDouble(1)");
-        testFilterExpression("cast(1 as decimal)", "castToBigDecimal(1, 10, 
0)");
+        testFilterExpression("cast(1 as decimal)", "castToDecimalData(1, 10, 
0)");
         testFilterExpression("cast(1 as char)", "castToString(1)");
         testFilterExpression("cast(1 as varchar)", "castToString(1)");
         testFilterExpression("cast(null as int)", "castToInteger(null)");
@@ -314,7 +314,7 @@ public class TransformParserTest {
         testFilterExpression("cast(null as bigint)", "castToLong(null)");
         testFilterExpression("cast(null as float)", "castToFloat(null)");
         testFilterExpression("cast(null as double)", "castToDouble(null)");
-        testFilterExpression("cast(null as decimal)", "castToBigDecimal(null, 
10, 0)");
+        testFilterExpression("cast(null as decimal)", "castToDecimalData(null, 
10, 0)");
         testFilterExpression("cast(null as char)", "castToString(null)");
         testFilterExpression("cast(null as varchar)", "castToString(null)");
         testFilterExpression(
@@ -328,15 +328,18 @@ public class TransformParserTest {
         List<Column> testColumns =
                 Arrays.asList(
                         Column.physicalColumn("id", DataTypes.INT(), "id"),
-                        Column.physicalColumn("name", DataTypes.STRING(), 
"string"),
+                        Column.physicalColumn("name", DataTypes.STRING(), 
"name"),
                         Column.physicalColumn("age", DataTypes.INT(), "age"),
-                        Column.physicalColumn("address", DataTypes.STRING(), 
"address"),
+                        Column.physicalColumn(
+                                "createTime", DataTypes.TIMESTAMP(3), 
"newCreateTime"),
+                        Column.physicalColumn("address", 
DataTypes.VARCHAR(50), "newAddress"),
+                        Column.physicalColumn("deposit", DataTypes.DECIMAL(10, 
2), "deposit"),
                         Column.physicalColumn("weight", DataTypes.DOUBLE(), 
"weight"),
                         Column.physicalColumn("height", DataTypes.DOUBLE(), 
"height"));
 
         List<ProjectionColumn> result =
                 TransformParser.generateProjectionColumns(
-                        "id, upper(name) as name, age + 1 as newage, weight / 
(height * height) as bmi",
+                        "id, upper(name) as name, age + 1 as newage, 
createTime as newCreateTime, address as newAddress, deposit as deposits, weight 
/ (height * height) as bmi",
                         testColumns,
                         Collections.emptyList(),
                         new SupportedMetadataColumn[0]);
@@ -346,6 +349,9 @@ public class TransformParserTest {
                         "ProjectionColumn{column=`id` INT 'id', 
expression='id', scriptExpression='id', originalColumnNames=[id], 
transformExpressionKey=null}",
                         "ProjectionColumn{column=`name` STRING, 
expression='UPPER(`TB`.`name`)', scriptExpression='upper(name)', 
originalColumnNames=[name], transformExpressionKey=null}",
                         "ProjectionColumn{column=`newage` INT, 
expression='`TB`.`age` + 1', scriptExpression='age + 1', 
originalColumnNames=[age], transformExpressionKey=null}",
+                        "ProjectionColumn{column=`newCreateTime` TIMESTAMP(3) 
'newCreateTime', expression='createTime', scriptExpression='createTime', 
originalColumnNames=[createTime], transformExpressionKey=null}",
+                        "ProjectionColumn{column=`newAddress` VARCHAR(50) 
'newAddress', expression='address', scriptExpression='address', 
originalColumnNames=[address], transformExpressionKey=null}",
+                        "ProjectionColumn{column=`deposits` DECIMAL(10, 2) 
'deposit', expression='deposit', scriptExpression='deposit', 
originalColumnNames=[deposit], transformExpressionKey=null}",
                         "ProjectionColumn{column=`bmi` DOUBLE, 
expression='`TB`.`weight` / (`TB`.`height` * `TB`.`height`)', 
scriptExpression='weight / height * height', originalColumnNames=[weight, 
height, height], transformExpressionKey=null}");
         Assertions.assertThat(result).hasToString("[" + String.join(", ", 
expected) + "]");
 
@@ -359,9 +365,11 @@ public class TransformParserTest {
         List<String> metadataExpected =
                 Arrays.asList(
                         "ProjectionColumn{column=`id` INT 'id', 
expression='id', scriptExpression='id', originalColumnNames=[id], 
transformExpressionKey=null}",
-                        "ProjectionColumn{column=`name` STRING 'string', 
expression='name', scriptExpression='name', originalColumnNames=[name], 
transformExpressionKey=null}",
+                        "ProjectionColumn{column=`name` STRING 'name', 
expression='name', scriptExpression='name', originalColumnNames=[name], 
transformExpressionKey=null}",
                         "ProjectionColumn{column=`age` INT 'age', 
expression='age', scriptExpression='age', originalColumnNames=[age], 
transformExpressionKey=null}",
-                        "ProjectionColumn{column=`address` STRING 'address', 
expression='address', scriptExpression='address', 
originalColumnNames=[address], transformExpressionKey=null}",
+                        "ProjectionColumn{column=`createTime` TIMESTAMP(3) 
'newCreateTime', expression='createTime', scriptExpression='createTime', 
originalColumnNames=[createTime], transformExpressionKey=null}",
+                        "ProjectionColumn{column=`address` VARCHAR(50) 
'newAddress', expression='address', scriptExpression='address', 
originalColumnNames=[address], transformExpressionKey=null}",
+                        "ProjectionColumn{column=`deposit` DECIMAL(10, 2) 
'deposit', expression='deposit', scriptExpression='deposit', 
originalColumnNames=[deposit], transformExpressionKey=null}",
                         "ProjectionColumn{column=`weight` DOUBLE 'weight', 
expression='weight', scriptExpression='weight', originalColumnNames=[weight], 
transformExpressionKey=null}",
                         "ProjectionColumn{column=`height` DOUBLE 'height', 
expression='height', scriptExpression='height', originalColumnNames=[height], 
transformExpressionKey=null}",
                         "ProjectionColumn{column=`__namespace_name__` STRING 
NOT NULL, expression='__namespace_name__', 
scriptExpression='__namespace_name__', 
originalColumnNames=[__namespace_name__], transformExpressionKey=null}",
@@ -384,6 +392,42 @@ public class TransformParserTest {
                         "Unrecognized projection expression: 1 + 1. Should be 
<EXPR> AS <IDENTIFIER>");
     }
 
+    @Test
+    public void testGenerateProjectionColumnsWithPrecision() {
+        List<Column> testColumns =
+                Arrays.asList(
+                        Column.physicalColumn("id", DataTypes.INT(), "id"),
+                        Column.physicalColumn("name", DataTypes.VARCHAR(50), 
"name"),
+                        Column.physicalColumn("sex", DataTypes.CHAR(1), "sex"),
+                        Column.physicalColumn("address", DataTypes.BINARY(50), 
"address"),
+                        Column.physicalColumn("phone", 
DataTypes.VARBINARY(50), "phone"),
+                        Column.physicalColumn("deposit", DataTypes.DECIMAL(10, 
2), "deposit"),
+                        Column.physicalColumn("birthday", 
DataTypes.TIMESTAMP(3), "birthday"),
+                        Column.physicalColumn(
+                                "birthday_ltz", DataTypes.TIMESTAMP_LTZ(3), 
"birthday_ltz"),
+                        Column.physicalColumn("update_time", 
DataTypes.TIME(3), "update_time"));
+
+        List<ProjectionColumn> result =
+                TransformParser.generateProjectionColumns(
+                        "id, UPPER(name) as name2, UPPER(sex) as sex2, 
COALESCE(address,address) as address2, COALESCE(phone,phone) as phone2, 
COALESCE(deposit,deposit) as deposit2, COALESCE(birthday,birthday) as 
birthday2, COALESCE(birthday_ltz,birthday_ltz) as birthday_ltz2, 
COALESCE(update_time,update_time) as update_time2",
+                        testColumns,
+                        Collections.emptyList(),
+                        new SupportedMetadataColumn[0]);
+
+        List<String> expected =
+                Arrays.asList(
+                        "ProjectionColumn{column=`id` INT 'id', 
expression='id', scriptExpression='id', originalColumnNames=[id], 
transformExpressionKey=null}",
+                        "ProjectionColumn{column=`name2` STRING, 
expression='UPPER(`TB`.`name`)', scriptExpression='upper(name)', 
originalColumnNames=[name], transformExpressionKey=null}",
+                        "ProjectionColumn{column=`sex2` STRING, 
expression='UPPER(`TB`.`sex`)', scriptExpression='upper(sex)', 
originalColumnNames=[sex], transformExpressionKey=null}",
+                        "ProjectionColumn{column=`address2` BINARY(50), 
expression='CASE WHEN `TB`.`address` IS NOT NULL THEN `TB`.`address` ELSE 
`TB`.`address` END', scriptExpression='(null != address ? address : address)', 
originalColumnNames=[address, address, address], transformExpressionKey=null}",
+                        "ProjectionColumn{column=`phone2` VARBINARY(50), 
expression='CASE WHEN `TB`.`phone` IS NOT NULL THEN `TB`.`phone` ELSE 
`TB`.`phone` END', scriptExpression='(null != phone ? phone : phone)', 
originalColumnNames=[phone, phone, phone], transformExpressionKey=null}",
+                        "ProjectionColumn{column=`deposit2` DECIMAL(10, 2), 
expression='CASE WHEN `TB`.`deposit` IS NOT NULL THEN `TB`.`deposit` ELSE 
`TB`.`deposit` END', scriptExpression='(null != deposit ? deposit : deposit)', 
originalColumnNames=[deposit, deposit, deposit], transformExpressionKey=null}",
+                        "ProjectionColumn{column=`birthday2` TIMESTAMP(3), 
expression='CASE WHEN `TB`.`birthday` IS NOT NULL THEN `TB`.`birthday` ELSE 
`TB`.`birthday` END', scriptExpression='(null != birthday ? birthday : 
birthday)', originalColumnNames=[birthday, birthday, birthday], 
transformExpressionKey=null}",
+                        "ProjectionColumn{column=`birthday_ltz2` 
TIMESTAMP_LTZ(3), expression='CASE WHEN `TB`.`birthday_ltz` IS NOT NULL THEN 
`TB`.`birthday_ltz` ELSE `TB`.`birthday_ltz` END', scriptExpression='(null != 
birthday_ltz ? birthday_ltz : birthday_ltz)', 
originalColumnNames=[birthday_ltz, birthday_ltz, birthday_ltz], 
transformExpressionKey=null}",
+                        "ProjectionColumn{column=`update_time2` TIME(3), 
expression='CASE WHEN `TB`.`update_time` IS NOT NULL THEN `TB`.`update_time` 
ELSE `TB`.`update_time` END', scriptExpression='(null != update_time ? 
update_time : update_time)', originalColumnNames=[update_time, update_time, 
update_time], transformExpressionKey=null}");
+        Assertions.assertThat(result).hasToString("[" + String.join(", ", 
expected) + "]");
+    }
+
     @Test
     public void testGenerateReferencedColumns() {
         List<Column> testColumns =

Reply via email to