This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 1388cf990 [FLINK-34877][cdc] Support type cast conversion in pipeline
transform
1388cf990 is described below
commit 1388cf99061b36f398aaf593a634a12e7b784c7f
Author: Wink <[email protected]>
AuthorDate: Wed Jul 31 23:32:39 2024 +0800
[FLINK-34877][cdc] Support type cast conversion in pipeline transform
This closes #3357.
---
.../cdc/runtime/functions/SystemFunctionUtils.java | 83 +++
.../flink/cdc/runtime/parser/JaninoCompiler.java | 66 ++
.../data/writer/AbstractBinaryWriter.java | 2 +-
.../transform/TransformDataOperatorTest.java | 663 +++++++++++++++++++++
.../cdc/runtime/parser/TransformParserTest.java | 28 +
5 files changed, 841 insertions(+), 1 deletion(-)
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
index f3dae8c1a..34c299567 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
@@ -26,6 +26,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
+import java.math.MathContext;
import java.math.RoundingMode;
import java.text.ParseException;
import java.text.SimpleDateFormat;
@@ -510,4 +511,86 @@ public class SystemFunctionUtils {
}
return null;
}
+
+ public static String castToString(Object object) {
+ if (object == null) {
+ return null;
+ }
+ return object.toString();
+ }
+
+ public static Byte castToByte(Object object) {
+ if (object == null) {
+ return null;
+ }
+ return Byte.valueOf(castObjectIntoString(object));
+ }
+
+ public static Boolean castToBoolean(Object object) {
+ if (object == null) {
+ return null;
+ }
+ if (object instanceof Byte
+ || object instanceof Short
+ || object instanceof Integer
+ || object instanceof Long
+ || object instanceof Float
+ || object instanceof Double
+ || object instanceof BigDecimal) {
+ return !object.equals(0);
+ }
+ return Boolean.valueOf(castToString(object));
+ }
+
+ public static Short castToShort(Object object) {
+ if (object == null) {
+ return null;
+ }
+ return Short.valueOf(castObjectIntoString(object));
+ }
+
+ public static Integer castToInteger(Object object) {
+ if (object == null) {
+ return null;
+ }
+ return Integer.valueOf(castObjectIntoString(object));
+ }
+
+ public static Long castToLong(Object object) {
+ if (object == null) {
+ return null;
+ }
+ return Long.valueOf(castObjectIntoString(object));
+ }
+
+ public static Float castToFloat(Object object) {
+ if (object == null) {
+ return null;
+ }
+ return Float.valueOf(castObjectIntoString(object));
+ }
+
+ public static Double castToDouble(Object object) {
+ if (object == null) {
+ return null;
+ }
+ return Double.valueOf(castObjectIntoString(object));
+ }
+
+ public static BigDecimal castToBigDecimal(Object object, int precision,
int scale) {
+ if (object == null) {
+ return null;
+ }
+ BigDecimal bigDecimal =
+ new BigDecimal(castObjectIntoString(object), new
MathContext(precision));
+ bigDecimal = bigDecimal.setScale(scale, BigDecimal.ROUND_HALF_UP);
+ return bigDecimal;
+ }
+
+ private static String castObjectIntoString(Object object) {
+ if (object instanceof Boolean) {
+ return Boolean.valueOf(castToString(object)) ? "1" : "0";
+ }
+ return String.valueOf(object);
+ }
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
index 5af9755ed..2dd1b8402 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
@@ -22,7 +22,9 @@ import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.cdc.common.utils.StringUtils;
import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlBasicTypeNameSpec;
import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlDataTypeSpec;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
@@ -228,6 +230,8 @@ public class JaninoCompiler {
case LESS_THAN_OR_EQUAL:
case GREATER_THAN_OR_EQUAL:
return generateBinaryOperation(sqlBasicCall, atoms,
sqlBasicCall.getKind().sql);
+ case CAST:
+ return generateCastOperation(sqlBasicCall, atoms);
case OTHER:
return generateOtherOperation(sqlBasicCall, atoms);
default:
@@ -256,6 +260,16 @@ public class JaninoCompiler {
Location.NOWHERE, null,
StringUtils.convertToCamelCase("VALUE_EQUALS"), atoms);
}
+ private static Java.Rvalue generateCastOperation(
+ SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
+ if (atoms.length != 1) {
+ throw new ParseException("Unrecognized expression: " +
sqlBasicCall.toString());
+ }
+ List<SqlNode> operandList = sqlBasicCall.getOperandList();
+ SqlDataTypeSpec sqlDataTypeSpec = (SqlDataTypeSpec) operandList.get(1);
+ return generateTypeConvertMethod(sqlDataTypeSpec, atoms);
+ }
+
private static Java.Rvalue generateOtherOperation(
SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
if (sqlBasicCall.getOperator().getName().equals("||")) {
@@ -298,4 +312,56 @@ public class JaninoCompiler {
StringUtils.convertToCamelCase(operationName),
timestampFunctionParam.toArray(new Java.Rvalue[0]));
}
+
+ private static Java.Rvalue generateTypeConvertMethod(
+ SqlDataTypeSpec sqlDataTypeSpec, Java.Rvalue[] atoms) {
+ switch (sqlDataTypeSpec.getTypeName().getSimple().toUpperCase()) {
+ case "BOOLEAN":
+ return new Java.MethodInvocation(Location.NOWHERE, null,
"castToBoolean", atoms);
+ case "TINYINT":
+ return new Java.MethodInvocation(Location.NOWHERE, null,
"castToByte", atoms);
+ case "SMALLINT":
+ return new Java.MethodInvocation(Location.NOWHERE, null,
"castToShort", atoms);
+ case "INTEGER":
+ return new Java.MethodInvocation(Location.NOWHERE, null,
"castToInteger", atoms);
+ case "BIGINT":
+ return new Java.MethodInvocation(Location.NOWHERE, null,
"castToLong", atoms);
+ case "FLOAT":
+ return new Java.MethodInvocation(Location.NOWHERE, null,
"castToFloat", atoms);
+ case "DOUBLE":
+ return new Java.MethodInvocation(Location.NOWHERE, null,
"castToDouble", atoms);
+ case "DECIMAL":
+ int precision = 10;
+ int scale = 0;
+ if (sqlDataTypeSpec.getTypeNameSpec() instanceof
SqlBasicTypeNameSpec) {
+ SqlBasicTypeNameSpec typeNameSpec =
+ (SqlBasicTypeNameSpec)
sqlDataTypeSpec.getTypeNameSpec();
+ if (typeNameSpec.getPrecision() > -1) {
+ precision = typeNameSpec.getPrecision();
+ }
+ if (typeNameSpec.getScale() > -1) {
+ scale = typeNameSpec.getScale();
+ }
+ }
+ List<Java.Rvalue> newAtoms = new
ArrayList<>(Arrays.asList(atoms));
+ newAtoms.add(
+ new Java.AmbiguousName(
+ Location.NOWHERE, new String[]
{String.valueOf(precision)}));
+ newAtoms.add(
+ new Java.AmbiguousName(
+ Location.NOWHERE, new String[]
{String.valueOf(scale)}));
+ return new Java.MethodInvocation(
+ Location.NOWHERE,
+ null,
+ "castToBigDecimal",
+ newAtoms.toArray(new Java.Rvalue[0]));
+ case "CHAR":
+ case "VARCHAR":
+ case "STRING":
+ return new Java.MethodInvocation(Location.NOWHERE, null,
"castToString", atoms);
+ default:
+ throw new ParseException(
+ "Unsupported data type cast: " +
sqlDataTypeSpec.toString());
+ }
+ }
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java
index acf3dac62..1422935ea 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java
@@ -138,7 +138,7 @@ abstract class AbstractBinaryWriter implements BinaryWriter
{
@Override
public void writeDecimal(int pos, DecimalData value, int precision) {
- assert value == null || (value.precision() == precision);
+ assert value == null || (value.precision() <= precision);
if (DecimalData.isCompact(precision)) {
assert value != null;
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java
index 09d12e823..2213127a4 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java
@@ -115,6 +115,43 @@ public class TransformDataOperatorTest {
.primaryKey("col1")
.build();
+ private static final TableId NULL_TABLEID =
+ TableId.tableId("my_company", "my_branch", "data_null");
+ private static final Schema NULL_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.STRING())
+ .physicalColumn("colString", DataTypes.STRING())
+ .physicalColumn("nullInt", DataTypes.INT())
+ .physicalColumn("nullBoolean", DataTypes.BOOLEAN())
+ .physicalColumn("nullTinyint", DataTypes.TINYINT())
+ .physicalColumn("nullSmallint", DataTypes.SMALLINT())
+ .physicalColumn("nullBigint", DataTypes.BIGINT())
+ .physicalColumn("nullFloat", DataTypes.FLOAT())
+ .physicalColumn("nullDouble", DataTypes.DOUBLE())
+ .physicalColumn("nullChar", DataTypes.CHAR(1))
+ .physicalColumn("nullVarchar", DataTypes.VARCHAR(1))
+ .physicalColumn("nullDecimal", DataTypes.DECIMAL(4, 2))
+ .primaryKey("col1")
+ .build();
+
+ private static final TableId CAST_TABLEID =
+ TableId.tableId("my_company", "my_branch", "data_cast");
+ private static final Schema CAST_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.STRING())
+ .physicalColumn("castInt", DataTypes.INT())
+ .physicalColumn("castBoolean", DataTypes.BOOLEAN())
+ .physicalColumn("castTinyint", DataTypes.TINYINT())
+ .physicalColumn("castSmallint", DataTypes.SMALLINT())
+ .physicalColumn("castBigint", DataTypes.BIGINT())
+ .physicalColumn("castFloat", DataTypes.FLOAT())
+ .physicalColumn("castDouble", DataTypes.DOUBLE())
+ .physicalColumn("castChar", DataTypes.CHAR(1))
+ .physicalColumn("castVarchar", DataTypes.VARCHAR(1))
+ .physicalColumn("castDecimal", DataTypes.DECIMAL(4, 2))
+ .primaryKey("col1")
+ .build();
+
private static final TableId CONDITION_TABLEID =
TableId.tableId("my_company", "my_branch", "condition_table");
private static final Schema CONDITION_SCHEMA =
@@ -555,6 +592,612 @@ public class TransformDataOperatorTest {
.isEqualTo(new StreamRecord<>(insertEventExpect));
}
+ @Test
+ void testNullCastTransform() throws Exception {
+ TransformDataOperator transform =
+ TransformDataOperator.newBuilder()
+ .addTransform(
+ NULL_TABLEID.identifier(),
+ "col1"
+ + ",colString"
+ + ",cast(colString as int) as nullInt"
+ + ",cast(colString as boolean) as
nullBoolean"
+ + ",cast(colString as tinyint) as
nullTinyint"
+ + ",cast(colString as smallint) as
nullSmallint"
+ + ",cast(colString as bigint) as
nullBigint"
+ + ",cast(colString as float) as
nullFloat"
+ + ",cast(colString as double) as
nullDouble"
+ + ",cast(colString as char) as
nullChar"
+ + ",cast(colString as varchar) as
nullVarchar"
+ + ",cast(colString as DECIMAL(4,2)) as
nullDecimal",
+ null)
+ .build();
+ EventOperatorTestHarness<TransformDataOperator, Event>
+ transformFunctionEventEventOperatorTestHarness =
+ new EventOperatorTestHarness<>(transform, 1);
+ // Initialization
+ transformFunctionEventEventOperatorTestHarness.open();
+ // Create table
+ CreateTableEvent createTableEvent = new CreateTableEvent(NULL_TABLEID,
NULL_SCHEMA);
+ BinaryRecordDataGenerator recordDataGenerator =
+ new BinaryRecordDataGenerator(((RowType)
NULL_SCHEMA.toRowDataType()));
+ // Insert
+ DataChangeEvent insertEvent =
+ DataChangeEvent.insertEvent(
+ NULL_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("1"),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ }));
+ transform.processElement(new StreamRecord<>(createTableEvent));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(new
CreateTableEvent(NULL_TABLEID, NULL_SCHEMA)));
+ transform.processElement(new StreamRecord<>(insertEvent));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEvent));
+ }
+
+ @Test
+ void testCastTransform() throws Exception {
+ TransformDataOperator transform =
+ TransformDataOperator.newBuilder()
+ .addTransform(
+ CAST_TABLEID.identifier(),
+ "col1"
+ + ",cast(col1 as int) as castInt"
+ + ",cast(col1 as boolean) as
castBoolean"
+ + ",cast(col1 as tinyint) as
castTinyint"
+ + ",cast(col1 as smallint) as
castSmallint"
+ + ",cast(col1 as bigint) as castBigint"
+ + ",cast(col1 as float) as castFloat"
+ + ",cast(col1 as double) as castDouble"
+ + ",cast(col1 as char) as castChar"
+ + ",cast(col1 as varchar) as
castVarchar"
+ + ",cast(col1 as DECIMAL(4,2)) as
castDecimal",
+ "col1 = '1'")
+ .addTransform(
+ CAST_TABLEID.identifier(),
+ "col1"
+ + ",cast(castInt as int) as castInt"
+ + ",cast(castInt as boolean) as
castBoolean"
+ + ",cast(castInt as tinyint) as
castTinyint"
+ + ",cast(castInt as smallint) as
castSmallint"
+ + ",cast(castInt as bigint) as
castBigint"
+ + ",cast(castInt as float) as
castFloat"
+ + ",cast(castInt as double) as
castDouble"
+ + ",cast(castInt as char) as castChar"
+ + ",cast(castInt as varchar) as
castVarchar"
+ + ",cast(castInt as DECIMAL(4,2)) as
castDecimal",
+ "col1 = '2'")
+ .addTransform(
+ CAST_TABLEID.identifier(),
+ "col1"
+ + ",cast(castBoolean as int) as
castInt"
+ + ",cast(castBoolean as boolean) as
castBoolean"
+ + ",cast(castBoolean as tinyint) as
castTinyint"
+ + ",cast(castBoolean as smallint) as
castSmallint"
+ + ",cast(castBoolean as bigint) as
castBigint"
+ + ",cast(castBoolean as float) as
castFloat"
+ + ",cast(castBoolean as double) as
castDouble"
+ + ",cast(castBoolean as char) as
castChar"
+ + ",cast(castBoolean as varchar) as
castVarchar"
+ + ",cast(castBoolean as DECIMAL(4,2))
as castDecimal",
+ "col1 = '3'")
+ .addTransform(
+ CAST_TABLEID.identifier(),
+ "col1"
+ + ",cast(castTinyint as int) as
castInt"
+ + ",cast(castTinyint as boolean) as
castBoolean"
+ + ",cast(castTinyint as tinyint) as
castTinyint"
+ + ",cast(castTinyint as smallint) as
castSmallint"
+ + ",cast(castTinyint as bigint) as
castBigint"
+ + ",cast(castTinyint as float) as
castFloat"
+ + ",cast(castTinyint as double) as
castDouble"
+ + ",cast(castTinyint as char) as
castChar"
+ + ",cast(castTinyint as varchar) as
castVarchar"
+ + ",cast(castTinyint as DECIMAL(4,2))
as castDecimal",
+ "col1 = '4'")
+ .addTransform(
+ CAST_TABLEID.identifier(),
+ "col1"
+ + ",cast(castSmallint as int) as
castInt"
+ + ",cast(castSmallint as boolean) as
castBoolean"
+ + ",cast(castSmallint as tinyint) as
castTinyint"
+ + ",cast(castSmallint as smallint) as
castSmallint"
+ + ",cast(castSmallint as bigint) as
castBigint"
+ + ",cast(castSmallint as float) as
castFloat"
+ + ",cast(castSmallint as double) as
castDouble"
+ + ",cast(castSmallint as char) as
castChar"
+ + ",cast(castSmallint as varchar) as
castVarchar"
+ + ",cast(castSmallint as DECIMAL(4,2))
as castDecimal",
+ "col1 = '5'")
+ .addTransform(
+ CAST_TABLEID.identifier(),
+ "col1"
+ + ",cast(castBigint as int) as castInt"
+ + ",cast(castBigint as boolean) as
castBoolean"
+ + ",cast(castBigint as tinyint) as
castTinyint"
+ + ",cast(castBigint as smallint) as
castSmallint"
+ + ",cast(castBigint as bigint) as
castBigint"
+ + ",cast(castBigint as float) as
castFloat"
+ + ",cast(castBigint as double) as
castDouble"
+ + ",cast(castBigint as char) as
castChar"
+ + ",cast(castBigint as varchar) as
castVarchar"
+ + ",cast(castBigint as DECIMAL(4,2))
as castDecimal",
+ "col1 = '6'")
+ .addTransform(
+ CAST_TABLEID.identifier(),
+ "col1"
+ + ",castInt"
+ + ",cast(castFloat as boolean) as
castBoolean"
+ + ",castTinyint"
+ + ",castSmallint"
+ + ",castBigint"
+ + ",cast(castFloat as float) as
castFloat"
+ + ",cast(castFloat as double) as
castDouble"
+ + ",cast(castFloat as char) as
castChar"
+ + ",cast(castFloat as varchar) as
castVarchar"
+ + ",cast(castFloat as DECIMAL(4,2)) as
castDecimal",
+ "col1 = '7'")
+ .addTransform(
+ CAST_TABLEID.identifier(),
+ "col1"
+ + ",castInt"
+ + ",cast(castDouble as boolean) as
castBoolean"
+ + ",castTinyint"
+ + ",castSmallint"
+ + ",castBigint"
+ + ",cast(castDouble as float) as
castFloat"
+ + ",cast(castDouble as double) as
castDouble"
+ + ",cast(castDouble as char) as
castChar"
+ + ",cast(castDouble as varchar) as
castVarchar"
+ + ",cast(castDouble as DECIMAL(4,2))
as castDecimal",
+ "col1 = '8'")
+ .addTransform(
+ CAST_TABLEID.identifier(),
+ "col1"
+ + ",castInt"
+ + ",cast(castDecimal as boolean) as
castBoolean"
+ + ",castTinyint"
+ + ",castSmallint"
+ + ",castBigint"
+ + ",cast(castDecimal as float) as
castFloat"
+ + ",cast(castDecimal as double) as
castDouble"
+ + ",cast(castDecimal as char) as
castChar"
+ + ",cast(castDecimal as varchar) as
castVarchar"
+ + ",cast(castDecimal as DECIMAL(4,2))
as castDecimal",
+ "col1 = '9'")
+ .build();
+ EventOperatorTestHarness<TransformDataOperator, Event>
+ transformFunctionEventEventOperatorTestHarness =
+ new EventOperatorTestHarness<>(transform, 1);
+ // Initialization
+ transformFunctionEventEventOperatorTestHarness.open();
+ // Create table
+ CreateTableEvent createTableEvent = new CreateTableEvent(CAST_TABLEID,
CAST_SCHEMA);
+ BinaryRecordDataGenerator recordDataGenerator =
+ new BinaryRecordDataGenerator(((RowType)
CAST_SCHEMA.toRowDataType()));
+ // Insert
+ DataChangeEvent insertEvent1 =
+ DataChangeEvent.insertEvent(
+ CAST_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("1"),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ }));
+ DataChangeEvent insertEventExpect1 =
+ DataChangeEvent.insertEvent(
+ CAST_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("1"),
+ new Integer(1),
+ new Boolean(false),
+ new Byte("1"),
+ new Short("1"),
+ new Long(1),
+ new Float(1.0f),
+ new Double(1.0d),
+ new BinaryStringData("1"),
+ new BinaryStringData("1"),
+ DecimalData.fromBigDecimal(new
BigDecimal(1.0), 4, 2),
+ }));
+ transform.processElement(new StreamRecord<>(createTableEvent));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(new
CreateTableEvent(CAST_TABLEID, CAST_SCHEMA)));
+ transform.processElement(new StreamRecord<>(insertEvent1));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect1));
+ DataChangeEvent insertEvent2 =
+ DataChangeEvent.insertEvent(
+ CAST_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("2"),
+ new Integer(1),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ }));
+ DataChangeEvent insertEventExpect2 =
+ DataChangeEvent.insertEvent(
+ CAST_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("2"),
+ new Integer(1),
+ new Boolean(true),
+ new Byte("1"),
+ new Short("1"),
+ new Long(1),
+ new Float(1.0f),
+ new Double(1.0d),
+ new BinaryStringData("1"),
+ new BinaryStringData("1"),
+ DecimalData.fromBigDecimal(new
BigDecimal(1.0), 4, 2),
+ }));
+ transform.processElement(new StreamRecord<>(insertEvent2));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect2));
+ DataChangeEvent insertEvent3 =
+ DataChangeEvent.insertEvent(
+ CAST_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("3"),
+ null,
+ new Boolean(true),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ }));
+ DataChangeEvent insertEventExpect3 =
+ DataChangeEvent.insertEvent(
+ CAST_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("3"),
+ new Integer(1),
+ new Boolean(true),
+ new Byte("1"),
+ new Short("1"),
+ new Long(1),
+ new Float(1.0f),
+ new Double(1.0d),
+ new BinaryStringData("true"),
+ new BinaryStringData("true"),
+ DecimalData.fromBigDecimal(new
BigDecimal(1.0), 4, 2),
+ }));
+ transform.processElement(new StreamRecord<>(insertEvent3));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect3));
+ DataChangeEvent insertEvent4 =
+ DataChangeEvent.insertEvent(
+ CAST_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("4"),
+ null,
+ null,
+ new Byte("1"),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ }));
+ DataChangeEvent insertEventExpect4 =
+ DataChangeEvent.insertEvent(
+ CAST_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("4"),
+ new Integer(1),
+ new Boolean(true),
+ new Byte("1"),
+ new Short("1"),
+ new Long(1),
+ new Float(1.0f),
+ new Double(1.0d),
+ new BinaryStringData("1"),
+ new BinaryStringData("1"),
+ DecimalData.fromBigDecimal(new
BigDecimal(1.0), 4, 2),
+ }));
+ transform.processElement(new StreamRecord<>(insertEvent4));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect4));
+ DataChangeEvent insertEvent5 =
+ DataChangeEvent.insertEvent(
+ CAST_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("5"),
+ null,
+ null,
+ null,
+ new Short("1"),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ }));
+ DataChangeEvent insertEventExpect5 =
+ DataChangeEvent.insertEvent(
+ CAST_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("5"),
+ new Integer(1),
+ new Boolean(true),
+ new Byte("1"),
+ new Short("1"),
+ new Long(1),
+ new Float(1.0f),
+ new Double(1.0d),
+ new BinaryStringData("1"),
+ new BinaryStringData("1"),
+ DecimalData.fromBigDecimal(new
BigDecimal(1.0), 4, 2),
+ }));
+ transform.processElement(new StreamRecord<>(insertEvent5));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect5));
+ DataChangeEvent insertEvent6 =
+ DataChangeEvent.insertEvent(
+ CAST_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("6"),
+ null,
+ null,
+ null,
+ null,
+ new Long(1),
+ null,
+ null,
+ null,
+ null,
+ null,
+ }));
+ DataChangeEvent insertEventExpect6 =
+ DataChangeEvent.insertEvent(
+ CAST_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("6"),
+ new Integer(1),
+ new Boolean(true),
+ new Byte("1"),
+ new Short("1"),
+ new Long(1),
+ new Float(1.0f),
+ new Double(1.0d),
+ new BinaryStringData("1"),
+ new BinaryStringData("1"),
+ DecimalData.fromBigDecimal(new
BigDecimal(1.0), 4, 2),
+ }));
+ transform.processElement(new StreamRecord<>(insertEvent6));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect6));
+ DataChangeEvent insertEvent7 =
+ DataChangeEvent.insertEvent(
+ CAST_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("7"),
+ null,
+ null,
+ null,
+ null,
+ null,
+ new Float(1.0f),
+ null,
+ null,
+ null,
+ null,
+ }));
+ DataChangeEvent insertEventExpect7 =
+ DataChangeEvent.insertEvent(
+ CAST_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("7"),
+ null,
+ new Boolean(true),
+ null,
+ null,
+ null,
+ new Float(1.0f),
+ new Double(1.0d),
+ new BinaryStringData("1.0"),
+ new BinaryStringData("1.0"),
+ DecimalData.fromBigDecimal(new
BigDecimal(1.0), 4, 2),
+ }));
+ transform.processElement(new StreamRecord<>(insertEvent7));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect7));
+ DataChangeEvent insertEvent8 =
+ DataChangeEvent.insertEvent(
+ CAST_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("8"),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ new Double(1.0d),
+ null,
+ null,
+ null,
+ }));
+ DataChangeEvent insertEventExpect8 =
+ DataChangeEvent.insertEvent(
+ CAST_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("8"),
+ null,
+ new Boolean(true),
+ null,
+ null,
+ null,
+ new Float(1.0f),
+ new Double(1.0d),
+ new BinaryStringData("1.0"),
+ new BinaryStringData("1.0"),
+ DecimalData.fromBigDecimal(new
BigDecimal(1.0), 4, 2),
+ }));
+ transform.processElement(new StreamRecord<>(insertEvent8));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect8));
+ DataChangeEvent insertEvent9 =
+ DataChangeEvent.insertEvent(
+ CAST_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("9"),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ DecimalData.fromBigDecimal(new
BigDecimal(1.0), 4, 2),
+ }));
+ DataChangeEvent insertEventExpect9 =
+ DataChangeEvent.insertEvent(
+ CAST_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("9"),
+ null,
+ new Boolean(true),
+ null,
+ null,
+ null,
+ new Float(1.0f),
+ new Double(1.0d),
+ new BinaryStringData("1.00"),
+ new BinaryStringData("1.00"),
+ DecimalData.fromBigDecimal(new
BigDecimal(1.0), 4, 2),
+ }));
+ transform.processElement(new StreamRecord<>(insertEvent9));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect9));
+ }
+
+ @Test
+ void testCastErrorTransform() throws Exception {
+ TransformDataOperator transform =
+ TransformDataOperator.newBuilder()
+ .addTransform(
+ CAST_TABLEID.identifier(),
+ "col1"
+ + ",cast(castFloat as int) as castInt"
+ + ",cast(castFloat as boolean) as
castBoolean"
+ + ",cast(castFloat as tinyint) as
castTinyint"
+ + ",cast(castFloat as smallint) as
castSmallint"
+ + ",cast(castFloat as bigint) as
castBigint"
+ + ",cast(castFloat as float) as
castFloat"
+ + ",cast(castFloat as double) as
castDouble"
+ + ",cast(castFloat as char) as
castChar"
+ + ",cast(castFloat as varchar) as
castVarchar"
+ + ",cast(castFloat as DECIMAL(4,2)) as
castDecimal",
+ "col1 = '1'")
+ .build();
+ EventOperatorTestHarness<TransformDataOperator, Event>
+ transformFunctionEventEventOperatorTestHarness =
+ new EventOperatorTestHarness<>(transform, 1);
+ // Initialization
+ transformFunctionEventEventOperatorTestHarness.open();
+ // Create table
+ CreateTableEvent createTableEvent = new CreateTableEvent(CAST_TABLEID,
CAST_SCHEMA);
+ BinaryRecordDataGenerator recordDataGenerator =
+ new BinaryRecordDataGenerator(((RowType)
CAST_SCHEMA.toRowDataType()));
+ // Insert
+ DataChangeEvent insertEvent1 =
+ DataChangeEvent.insertEvent(
+ CAST_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ new BinaryStringData("1"),
+ null,
+ null,
+ null,
+ null,
+ null,
+ new Float(1.0f),
+ null,
+ null,
+ null,
+ null,
+ }));
+ transform.processElement(new StreamRecord<>(createTableEvent));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(new
CreateTableEvent(CAST_TABLEID, CAST_SCHEMA)));
+ Assertions.assertThatThrownBy(
+ () -> {
+ transform.processElement(new
StreamRecord<>(insertEvent1));
+ })
+ .isExactlyInstanceOf(RuntimeException.class)
+ .hasRootCauseInstanceOf(NumberFormatException.class)
+ .hasRootCauseMessage("For input string: \"1.0\"");
+ }
+
@Test
void testBuildInFunctionTransform() throws Exception {
testExpressionConditionTransform(
@@ -592,6 +1235,26 @@ public class TransformDataOperatorTest {
"case 1 when 1 then 'a' when 2 then 'b' else 'c' end = 'a'");
testExpressionConditionTransform("case col1 when '1' then true else
false end");
testExpressionConditionTransform("case when col1 = '1' then true else
false end");
+ testExpressionConditionTransform("cast(col1 as int) = 1");
+ testExpressionConditionTransform("cast('true' as boolean)");
+ testExpressionConditionTransform("cast(col1 as tinyint) = cast(1 as
tinyint)");
+ testExpressionConditionTransform("cast(col1 as smallint) = cast(1 as
smallint)");
+ testExpressionConditionTransform("cast(col1 as bigint) = cast(1 as
bigint)");
+ testExpressionConditionTransform("cast(col1 as float) = cast(1 as
float)");
+ testExpressionConditionTransform("cast(col1 as double) = cast(1 as
double)");
+ testExpressionConditionTransform("cast('1' as char) = '1'");
+ testExpressionConditionTransform("cast(col1 as varchar) = '1'");
+ testExpressionConditionTransform("cast(col1 as DECIMAL(4,2)) =
cast(1.0 as DECIMAL(4,2))");
+ testExpressionConditionTransform("cast(null as int) is null");
+ testExpressionConditionTransform("cast(null as boolean) is null");
+ testExpressionConditionTransform("cast(null as tinyint) is null");
+ testExpressionConditionTransform("cast(null as smallint) is null");
+ testExpressionConditionTransform("cast(null as bigint) is null");
+ testExpressionConditionTransform("cast(null as float) is null");
+ testExpressionConditionTransform("cast(null as double) is null");
+ testExpressionConditionTransform("cast(null as char) is null");
+ testExpressionConditionTransform("cast(null as varchar) is null");
+ testExpressionConditionTransform("cast(null as DECIMAL(4,2)) is null");
}
private void testExpressionConditionTransform(String expression) throws
Exception {
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
index 9dffeb84a..f81c4a92c 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
@@ -264,6 +264,34 @@ public class TransformParserTest {
testFilterExpression(
"case when id = 1 then 'a' when id = 2 then 'b' else 'c' end",
"(valueEquals(id, 1) ? \"a\" : valueEquals(id, 2) ? \"b\" :
\"c\")");
+ testFilterExpression(
+ "case id when 1 then 'a' when 2 then 'b' else 'c' end",
+ "(valueEquals(id, 1) ? \"a\" : valueEquals(id, 2) ? \"b\" :
\"c\")");
+ testFilterExpression(
+ "case when id = 1 then 'a' when id = 2 then 'b' else 'c' end",
+ "(valueEquals(id, 1) ? \"a\" : valueEquals(id, 2) ? \"b\" :
\"c\")");
+ testFilterExpression("cast(id||'0' as int)", "castToInteger(concat(id,
\"0\"))");
+ testFilterExpression("cast(1 as string)", "castToString(1)");
+ testFilterExpression("cast(1 as boolean)", "castToBoolean(1)");
+ testFilterExpression("cast(1 as tinyint)", "castToByte(1)");
+ testFilterExpression("cast(1 as smallint)", "castToShort(1)");
+ testFilterExpression("cast(1 as bigint)", "castToLong(1)");
+ testFilterExpression("cast(1 as float)", "castToFloat(1)");
+ testFilterExpression("cast(1 as double)", "castToDouble(1)");
+ testFilterExpression("cast(1 as decimal)", "castToBigDecimal(1, 10,
0)");
+ testFilterExpression("cast(1 as char)", "castToString(1)");
+ testFilterExpression("cast(1 as varchar)", "castToString(1)");
+ testFilterExpression("cast(null as int)", "castToInteger(null)");
+ testFilterExpression("cast(null as string)", "castToString(null)");
+ testFilterExpression("cast(null as boolean)", "castToBoolean(null)");
+ testFilterExpression("cast(null as tinyint)", "castToByte(null)");
+ testFilterExpression("cast(null as smallint)", "castToShort(null)");
+ testFilterExpression("cast(null as bigint)", "castToLong(null)");
+ testFilterExpression("cast(null as float)", "castToFloat(null)");
+ testFilterExpression("cast(null as double)", "castToDouble(null)");
+ testFilterExpression("cast(null as decimal)", "castToBigDecimal(null,
10, 0)");
+ testFilterExpression("cast(null as char)", "castToString(null)");
+ testFilterExpression("cast(null as varchar)", "castToString(null)");
}
private void testFilterExpression(String expression, String
expressionExpect) {