yuxiqian commented on code in PR #3881:
URL: https://github.com/apache/flink-cdc/pull/3881#discussion_r1926245858
##########
flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java:
##########
@@ -2184,6 +2184,284 @@ void testTransformWithLargeLiterals() throws Exception {
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1,
before=[12th, 15, Oops, 2147483647, 2147483648, -2147483648, -2147483649,
1234567890123456789], after=[], op=DELETE, meta=()}");
}
+ @Test
+ void testFloorCeilAndRoundFunctionWithLiterals() throws Exception {
+ FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+ // Setup value source
+ Configuration sourceConfig = new Configuration();
+
+ sourceConfig.set(
+ ValuesDataSourceOptions.EVENT_SET_ID,
+ ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
+
+ TableId tableId = TableId.tableId("default_namespace",
"default_schema", "mytable1");
+ List<Event> events = generateFloorCeilAndRoundEvents(tableId);
+
+
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
+
+ SourceDef sourceDef =
+ new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source",
sourceConfig);
+
+ // Setup value sink
+ Configuration sinkConfig = new Configuration();
+ SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value
Sink", sinkConfig);
+
+ // Setup pipeline
+ Configuration pipelineConfig = new Configuration();
+ pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
+ PipelineDef pipelineDef =
+ new PipelineDef(
+ sourceDef,
+ sinkDef,
+ Collections.emptyList(),
+ Collections.singletonList(
+ new TransformDef(
+ "\\.*.\\.*.\\.*",
+ "*, "
+ + "CEIL(tinyint_col) AS
ceil_tinyint,"
+ + "CEIL(smallint_col) AS
ceil_smallint,"
+ + "CEIL(int_col) AS
ceil_int,"
+ + "CEIL(bigint_col) AS
ceil_bigint,"
+ + "CEIL(float_col) AS
ceil_float,"
+ + "CEIL(double_col) AS
ceil_double,"
+ + "CEIL(decimal_col) AS
ceil_decimal,"
+ + "CEILING(tinyint_col) AS
ceiling_tinyint,"
+ + "CEILING(smallint_col) AS
ceiling_smallint,"
+ + "CEILING(int_col) AS
ceiling_int,"
+ + "CEILING(bigint_col) AS
ceiling_bigint,"
+ + "CEILING(float_col) AS
ceiling_float,"
+ + "CEILING(double_col) AS
ceiling_double,"
+ + "CEILING(decimal_col) AS
ceiling_decimal,"
+ + "FLOOR(tinyint_col) AS
floor_tinyint,"
+ + "FLOOR(smallint_col) AS
floor_smallint,"
+ + "FLOOR(int_col) AS
floor_int,"
+ + "FLOOR(bigint_col) AS
floor_bigint,"
+ + "FLOOR(float_col) AS
floor_float,"
+ + "FLOOR(double_col) AS
floor_double,"
+ + "FLOOR(decimal_col) AS
floor_decimal,"
+ + "ROUND(tinyint_col, 2) AS
round_tinyint,"
+ + "ROUND(smallint_col, 2) AS
round_smallint,"
+ + "ROUND(int_col, 2) AS
round_int,"
+ + "ROUND(bigint_col, 2) AS
round_bigint,"
+ + "ROUND(float_col, 2) AS
round_float,"
+ + "ROUND(double_col, 2) AS
round_double,"
+ + "ROUND(decimal_col, 2) AS
round_decimal",
Review Comment:
Considering another common use-case is `ROUND(..., 0)`, maybe we can also
add corresponding tests, too?
##########
flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java:
##########
@@ -2184,6 +2184,284 @@ void testTransformWithLargeLiterals() throws Exception {
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1,
before=[12th, 15, Oops, 2147483647, 2147483648, -2147483648, -2147483649,
1234567890123456789], after=[], op=DELETE, meta=()}");
}
+ @Test
+ void testFloorCeilAndRoundFunctionWithLiterals() throws Exception {
+ FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+ // Setup value source
+ Configuration sourceConfig = new Configuration();
+
+ sourceConfig.set(
+ ValuesDataSourceOptions.EVENT_SET_ID,
+ ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
+
+ TableId tableId = TableId.tableId("default_namespace",
"default_schema", "mytable1");
+ List<Event> events = generateFloorCeilAndRoundEvents(tableId);
+
+
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
+
+ SourceDef sourceDef =
+ new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source",
sourceConfig);
+
+ // Setup value sink
+ Configuration sinkConfig = new Configuration();
+ SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value
Sink", sinkConfig);
+
+ // Setup pipeline
+ Configuration pipelineConfig = new Configuration();
+ pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
+ PipelineDef pipelineDef =
+ new PipelineDef(
+ sourceDef,
+ sinkDef,
+ Collections.emptyList(),
+ Collections.singletonList(
+ new TransformDef(
+ "\\.*.\\.*.\\.*",
+ "*, "
+ + "CEIL(tinyint_col) AS
ceil_tinyint,"
+ + "CEIL(smallint_col) AS
ceil_smallint,"
+ + "CEIL(int_col) AS
ceil_int,"
+ + "CEIL(bigint_col) AS
ceil_bigint,"
+ + "CEIL(float_col) AS
ceil_float,"
+ + "CEIL(double_col) AS
ceil_double,"
+ + "CEIL(decimal_col) AS
ceil_decimal,"
+ + "CEILING(tinyint_col) AS
ceiling_tinyint,"
+ + "CEILING(smallint_col) AS
ceiling_smallint,"
+ + "CEILING(int_col) AS
ceiling_int,"
+ + "CEILING(bigint_col) AS
ceiling_bigint,"
+ + "CEILING(float_col) AS
ceiling_float,"
+ + "CEILING(double_col) AS
ceiling_double,"
+ + "CEILING(decimal_col) AS
ceiling_decimal,"
+ + "FLOOR(tinyint_col) AS
floor_tinyint,"
+ + "FLOOR(smallint_col) AS
floor_smallint,"
+ + "FLOOR(int_col) AS
floor_int,"
+ + "FLOOR(bigint_col) AS
floor_bigint,"
+ + "FLOOR(float_col) AS
floor_float,"
+ + "FLOOR(double_col) AS
floor_double,"
+ + "FLOOR(decimal_col) AS
floor_decimal,"
+ + "ROUND(tinyint_col, 2) AS
round_tinyint,"
+ + "ROUND(smallint_col, 2) AS
round_smallint,"
+ + "ROUND(int_col, 2) AS
round_int,"
+ + "ROUND(bigint_col, 2) AS
round_bigint,"
+ + "ROUND(float_col, 2) AS
round_float,"
+ + "ROUND(double_col, 2) AS
round_double,"
+ + "ROUND(decimal_col, 2) AS
round_decimal",
+ null,
+ "id",
+ null,
+ null,
+ null,
+ null)),
+ Collections.emptyList(),
+ pipelineConfig);
+
+ // Execute the pipeline
+ PipelineExecution execution = composer.compose(pipelineDef);
+ execution.execute();
+
+ // Check the order and content of all received events
+ String[] outputEvents = outCaptor.toString().trim().split("\n");
+
+ assertThat(outputEvents)
+ .containsExactly(
+
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1,
schema=columns={`id` INT NOT NULL,`tinyint_col` TINYINT,`smallint_col`
SMALLINT,`int_col` INT,`bigint_col` BIGINT,`float_col` FLOAT,`double_col`
DOUBLE,`decimal_col` DECIMAL(10, 3),`ceil_tinyint` TINYINT,`ceil_smallint`
SMALLINT,`ceil_int` INT,`ceil_bigint` BIGINT,`ceil_float` FLOAT,`ceil_double`
DOUBLE,`ceil_decimal` DECIMAL(10, 0),`ceiling_tinyint`
TINYINT,`ceiling_smallint` SMALLINT,`ceiling_int` INT,`ceiling_bigint`
BIGINT,`ceiling_float` FLOAT,`ceiling_double` DOUBLE,`ceiling_decimal`
DECIMAL(10, 0),`floor_tinyint` TINYINT,`floor_smallint` SMALLINT,`floor_int`
INT,`floor_bigint` BIGINT,`floor_float` FLOAT,`floor_double`
DOUBLE,`floor_decimal` DECIMAL(10, 0),`round_tinyint` TINYINT,`round_smallint`
SMALLINT,`round_int` INT,`round_bigint` BIGINT,`round_float`
FLOAT,`round_double` DOUBLE,`round_decimal` DECIMAL(10, 2)}, primaryKeys=id,
options=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[1, 1, 1, 1, 1, 1.1, 1.1, 1.100, 1, 1, 1, 1, 2.0, 2.0, 2, 1, 1, 1, 1,
2.0, 2.0, 2, 1, 1, 1, 1, 1.0, 1.0, 1, 1, 1, 1, 1, 1.1, 1.1, 1.10], op=INSERT,
meta=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[4, 4, 4, 4, 4, 4.44, 4.44, 4.440, 4, 4, 4, 4, 5.0, 5.0, 5, 4, 4, 4, 4,
5.0, 5.0, 5, 4, 4, 4, 4, 4.0, 4.0, 4, 4, 4, 4, 4, 4.44, 4.44, 4.44], op=INSERT,
meta=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[5, 5, 5, 5, 5, 5.555, 5.555, 5.555, 5, 5, 5, 5, 6.0, 6.0, 6, 5, 5, 5, 5,
6.0, 6.0, 6, 5, 5, 5, 5, 5.0, 5.0, 5, 5, 5, 5, 5, 5.56, 5.56, 5.56], op=INSERT,
meta=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[9, 9, 9, 9, 9, 1.0E7, 9999999.999, 9999999.999, 9, 9, 9, 9, 1.0E7,
1.0E7, 10000000, 9, 9, 9, 9, 1.0E7, 1.0E7, 10000000, 9, 9, 9, 9, 1.0E7,
9999999.0, 9999999, 9, 9, 9, 9, 1.0E7, 1.0E7, 10000000.00], op=INSERT,
meta=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[0, null, null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, null, null, null, null, null, null], op=INSERT,
meta=()}");
+ }
+
+ @Test
+ void testAbsFunctionWithLiterals() throws Exception {
Review Comment:
```suggestion
void testAbsFunction() throws Exception {
```
##########
flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java:
##########
@@ -2184,6 +2184,284 @@ void testTransformWithLargeLiterals() throws Exception {
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1,
before=[12th, 15, Oops, 2147483647, 2147483648, -2147483648, -2147483649,
1234567890123456789], after=[], op=DELETE, meta=()}");
}
+ @Test
+ void testFloorCeilAndRoundFunctionWithLiterals() throws Exception {
Review Comment:
Seems these cases aren't testing calling function with literals (but with
specific column values).
```suggestion
void testFloorCeilAndRoundFunctions() throws Exception {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]