This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 551bdf395 [FLINK-36636][transform] Supports timestamp comparison in 
cdc pipeline transform
551bdf395 is described below

commit 551bdf395daf33588f5df86dd8450d23a844128e
Author: Wink <809097...@qq.com>
AuthorDate: Tue Jan 14 21:43:21 2025 +0800

    [FLINK-36636][transform] Supports timestamp comparison in cdc pipeline 
transform
    
    This closes #3677
---
 docs/content.zh/docs/core-concept/transform.md     |   8 +-
 docs/content/docs/core-concept/transform.md        |  30 +--
 .../cdc/runtime/functions/SystemFunctionUtils.java |  44 +++++
 .../flink/cdc/runtime/parser/JaninoCompiler.java   |  30 ++-
 .../transform/PostTransformOperatorTest.java       | 219 +++++++++++++++++++++
 .../cdc/runtime/parser/TransformParserTest.java    |  29 +--
 6 files changed, 326 insertions(+), 34 deletions(-)

diff --git a/docs/content.zh/docs/core-concept/transform.md 
b/docs/content.zh/docs/core-concept/transform.md
index dfa90a728..ec88aeadf 100644
--- a/docs/content.zh/docs/core-concept/transform.md
+++ b/docs/content.zh/docs/core-concept/transform.md
@@ -92,10 +92,10 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to 
parse expressions and [
 
|----------------------|-----------------------------|-----------------------------------------------------------------|
 | value1 = value2      | valueEquals(value1, value2) | Returns TRUE if value1 
is equal to value2; returns FALSE if value1 or value2 is NULL. |
 | value1 <> value2     | !valueEquals(value1, value2) | Returns TRUE if value1 
is not equal to value2; returns FALSE if value1 or value2 is NULL. |
-| value1 > value2      | value1 > value2             | Returns TRUE if value1 
is greater than value2; returns FALSE if value1 or value2 is NULL. |
-| value1 >= value2     | value1 >= value2            | Returns TRUE if value1 
is greater than or equal to value2; returns FALSE if value1 or value2 is NULL. |
-| value1 < value2      | value1 < value2             | Returns TRUE if value1 
is less than value2; returns FALSE if value1 or value2 is NULL. |
-| value1 <= value2     | value1 <= value2            | Returns TRUE if value1 
is less than or equal to value2; returns FALSE if value1 or value2 is NULL. |
+| value1 > value2      | greaterThan(value1, value2)     | Returns TRUE if 
value1 is greater than value2; returns FALSE if value1 or value2 is NULL. |
+| value1 >= value2     | greaterThanOrEqual(value1, value2) | Returns TRUE if 
value1 is greater than or equal to value2; returns FALSE if value1 or value2 is 
NULL. |
+| value1 < value2      | lessThan(value1, value2)        | Returns TRUE if 
value1 is less than value2; returns FALSE if value1 or value2 is NULL. |
+| value1 <= value2     | lessThanOrEqual(value1, value2) | Returns TRUE if 
value1 is less than or equal to value2; returns FALSE if value1 or value2 is 
NULL. |
 | value IS NULL        | null == value               | Returns TRUE if value 
is NULL.                                  |
 | value IS NOT NULL    | null != value               | Returns TRUE if value 
is not NULL.                              |
 | value1 BETWEEN value2 AND value3 | betweenAsymmetric(value1, value2, value3) 
| Returns TRUE if value1 is greater than or equal to value2 and less than or 
equal to value3. |
diff --git a/docs/content/docs/core-concept/transform.md 
b/docs/content/docs/core-concept/transform.md
index 67eeaf2d8..388b0ff46 100644
--- a/docs/content/docs/core-concept/transform.md
+++ b/docs/content/docs/core-concept/transform.md
@@ -88,22 +88,22 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to 
parse expressions and [
 
 ## Comparison Functions
 
-| Function             | Janino Code                 | Description             
                                        |
-|----------------------|-----------------------------|-----------------------------------------------------------------|
-| value1 = value2      | valueEquals(value1, value2) | Returns TRUE if value1 
is equal to value2; returns FALSE if value1 or value2 is NULL. |
-| value1 <> value2     | !valueEquals(value1, value2) | Returns TRUE if value1 
is not equal to value2; returns FALSE if value1 or value2 is NULL. |
-| value1 > value2      | value1 > value2             | Returns TRUE if value1 
is greater than value2; returns FALSE if value1 or value2 is NULL. |
-| value1 >= value2     | value1 >= value2            | Returns TRUE if value1 
is greater than or equal to value2; returns FALSE if value1 or value2 is NULL. |
-| value1 < value2      | value1 < value2             | Returns TRUE if value1 
is less than value2; returns FALSE if value1 or value2 is NULL. |
-| value1 <= value2     | value1 <= value2            | Returns TRUE if value1 
is less than or equal to value2; returns FALSE if value1 or value2 is NULL. |
-| value IS NULL        | null == value               | Returns TRUE if value 
is NULL.                                  |
-| value IS NOT NULL    | null != value               | Returns TRUE if value 
is not NULL.                              |
-| value1 BETWEEN value2 AND value3 | betweenAsymmetric(value1, value2, value3) 
| Returns TRUE if value1 is greater than or equal to value2 and less than or 
equal to value3. |
+| Function             | Janino Code                                  | 
Description                                                     |
+|----------------------|----------------------------------------------|-----------------------------------------------------------------|
+| value1 = value2      | valueEquals(value1, value2)                  | 
Returns TRUE if value1 is equal to value2; returns FALSE if value1 or value2 is 
NULL. |
+| value1 <> value2     | !valueEquals(value1, value2)                 | 
Returns TRUE if value1 is not equal to value2; returns FALSE if value1 or 
value2 is NULL. |
+| value1 > value2      | greaterThan(value1, value2)                  | 
Returns TRUE if value1 is greater than value2; returns FALSE if value1 or 
value2 is NULL. |
+| value1 >= value2     | greaterThanOrEqual(value1, value2)               | 
Returns TRUE if value1 is greater than or equal to value2; returns FALSE if 
value1 or value2 is NULL. |
+| value1 < value2      | lessThan(value1, value2)                         | 
Returns TRUE if value1 is less than value2; returns FALSE if value1 or value2 
is NULL. |
+| value1 <= value2     | lessThanOrEqual(value1, value2)                  | 
Returns TRUE if value1 is less than or equal to value2; returns FALSE if value1 
or value2 is NULL. |
+| value IS NULL        | null == value                                | 
Returns TRUE if value is NULL.                                  |
+| value IS NOT NULL    | null != value                                | 
Returns TRUE if value is not NULL.                              |
+| value1 BETWEEN value2 AND value3 | betweenAsymmetric(value1, value2, value3) 
   | Returns TRUE if value1 is greater than or equal to value2 and less than or 
equal to value3. |
 | value1 NOT BETWEEN value2 AND value3 | notBetweenAsymmetric(value1, value2, 
value3) | Returns TRUE if value1 is less than value2 or greater than value3. |
-| string1 LIKE string2 | like(string1, string2)      | Returns TRUE if string1 
matches pattern string2.                |
-| string1 NOT LIKE string2 | notLike(string1, string2) | Returns TRUE if 
string1 does not match pattern string2.       |
-| value1 IN (value2 [, value3]* ) | in(value1, value2 [, value3]*) | Returns 
TRUE if value1 exists in the given list (value2, value3, …). |
-| value1 NOT IN (value2 [, value3]* ) | notIn(value1, value2 [, value3]*) | 
Returns TRUE if value1 does not exist in the given list (value2, value3, …).  |
+| string1 LIKE string2 | like(string1, string2)                       | 
Returns TRUE if string1 matches pattern string2.                |
+| string1 NOT LIKE string2 | notLike(string1, string2)                    | 
Returns TRUE if string1 does not match pattern string2.       |
+| value1 IN (value2 [, value3]* ) | in(value1, value2 [, value3]*)             
  | Returns TRUE if value1 exists in the given list (value2, value3, …). |
+| value1 NOT IN (value2 [, value3]* ) | notIn(value1, value2 [, value3]*)      
      | Returns TRUE if value1 does not exist in the given list (value2, 
value3, …).  |
 
 ## Logical Functions
 
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
index a7555203c..4a2674478 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
@@ -867,4 +867,48 @@ public class SystemFunctionUtils {
         }
         return String.valueOf(object);
     }
+
+    private static int universalCompares(Object lhs, Object rhs) {
+        Class<?> leftClass = lhs.getClass();
+        Class<?> rightClass = rhs.getClass();
+        if (leftClass.equals(rightClass) && lhs instanceof Comparable) {
+            return ((Comparable) lhs).compareTo(rhs);
+        } else if (lhs instanceof Number && rhs instanceof Number) {
+            return Double.compare(((Number) lhs).doubleValue(), ((Number) 
rhs).doubleValue());
+        } else {
+            throw new RuntimeException(
+                    "Comparison of unsupported data types: "
+                            + leftClass.getName()
+                            + " and "
+                            + rightClass.getName());
+        }
+    }
+
+    public static boolean greaterThan(Object lhs, Object rhs) {
+        if (lhs == null || rhs == null) {
+            return false;
+        }
+        return universalCompares(lhs, rhs) > 0;
+    }
+
+    public static boolean greaterThanOrEqual(Object lhs, Object rhs) {
+        if (lhs == null || rhs == null) {
+            return false;
+        }
+        return universalCompares(lhs, rhs) >= 0;
+    }
+
+    public static boolean lessThan(Object lhs, Object rhs) {
+        if (lhs == null || rhs == null) {
+            return false;
+        }
+        return universalCompares(lhs, rhs) < 0;
+    }
+
+    public static boolean lessThanOrEqual(Object lhs, Object rhs) {
+        if (lhs == null || rhs == null) {
+            return false;
+        }
+        return universalCompares(lhs, rhs) <= 0;
+    }
 }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
index f60bf968c..b122aded8 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
@@ -272,7 +272,7 @@ public class JaninoCompiler {
             case GREATER_THAN:
             case LESS_THAN_OR_EQUAL:
             case GREATER_THAN_OR_EQUAL:
-                return generateBinaryOperation(sqlBasicCall, atoms, 
sqlBasicCall.getKind().sql);
+                return generateCompareOperation(sqlBasicCall, atoms);
             case CAST:
                 return generateCastOperation(sqlBasicCall, atoms);
             case OTHER:
@@ -313,6 +313,34 @@ public class JaninoCompiler {
         return generateTypeConvertMethod(sqlDataTypeSpec, atoms);
     }
 
+    private static Java.Rvalue generateCompareOperation(
+            SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
+        if (atoms.length != 2) {
+            throw new ParseException("Unrecognized expression: " + 
sqlBasicCall.toString());
+        }
+        String compareMethodName;
+        switch (sqlBasicCall.getKind()) {
+            case LESS_THAN:
+                compareMethodName = "LESS_THAN";
+                break;
+            case GREATER_THAN:
+                compareMethodName = "GREATER_THAN";
+                break;
+            case LESS_THAN_OR_EQUAL:
+                compareMethodName = "LESS_THAN_OR_EQUAL";
+                break;
+            case GREATER_THAN_OR_EQUAL:
+                compareMethodName = "GREATER_THAN_OR_EQUAL";
+                break;
+            default:
+                throw new ParseException(
+                        "Unsupported binary relation operator: "
+                                + sqlBasicCall.getKind().toString());
+        }
+        return new Java.MethodInvocation(
+                Location.NOWHERE, null, 
StringUtils.convertToCamelCase(compareMethodName), atoms);
+    }
+
     private static Java.Rvalue generateOtherOperation(
             SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
         if (sqlBasicCall.getOperator().getName().equals("||")) {
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
index f30c19946..f744967ba 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
@@ -32,6 +32,8 @@ import 
org.apache.flink.cdc.runtime.testutils.operators.RegularEventOperatorTest
 import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+import org.apache.calcite.runtime.CalciteContextException;
+import org.apache.calcite.sql.validate.SqlValidatorException;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
@@ -276,6 +278,38 @@ public class PostTransformOperatorTest {
                     .primaryKey("col1")
                     .build();
 
+    private static final TableId COMPARE_TABLEID =
+            TableId.tableId("my_company", "my_branch", "compare_table");
+    private static final Schema COMPARE_SCHEMA =
+            Schema.newBuilder()
+                    .physicalColumn("col1", DataTypes.STRING().notNull())
+                    .physicalColumn("numerical_equal", DataTypes.BOOLEAN())
+                    .physicalColumn("string_equal", DataTypes.BOOLEAN())
+                    .physicalColumn("time_equal", DataTypes.BOOLEAN())
+                    .physicalColumn("timestamp_equal", DataTypes.BOOLEAN())
+                    .physicalColumn("date_equal", DataTypes.BOOLEAN())
+                    .primaryKey("col1")
+                    .build();
+
+    private static final TableId COMPARE_DATA_TABLEID =
+            TableId.tableId("my_company", "my_branch", "compare_data_table");
+    private static final Schema COMPARE_DATA_SCHEMA =
+            Schema.newBuilder()
+                    .physicalColumn("id", DataTypes.INT().notNull())
+                    .physicalColumn("c1", DataTypes.FLOAT().nullable())
+                    .physicalColumn("c2", DataTypes.DOUBLE().nullable())
+                    .physicalColumn("c3", DataTypes.TIMESTAMP().nullable())
+                    .primaryKey("id")
+                    .build();
+    private static final Schema EXPECTD_COMPARE_DATA_SCHEMA =
+            Schema.newBuilder()
+                    .physicalColumn("id", DataTypes.INT().notNull())
+                    .physicalColumn("float_equal", DataTypes.BOOLEAN())
+                    .physicalColumn("double_equal", DataTypes.BOOLEAN())
+                    .physicalColumn("timestamp_equal", DataTypes.BOOLEAN())
+                    .primaryKey("id")
+                    .build();
+
     @Test
     void testDataChangeEventTransform() throws Exception {
         PostTransformOperator transform =
@@ -1961,6 +1995,191 @@ public class PostTransformOperatorTest {
         transformFunctionEventEventOperatorTestHarness.close();
     }
 
+    @Test
+    void testCompareTransform() throws Exception {
+        PostTransformOperator transform =
+                PostTransformOperator.newBuilder()
+                        .addTransform(
+                                COMPARE_TABLEID.identifier(),
+                                "col1, 2.1 > 1 as numerical_equal,"
+                                        + " '2024-01-01 00:00:00' < 
'2024-08-01 00:00:00' as string_equal,"
+                                        + " LOCALTIME <= CURRENT_TIME as 
time_equal,"
+                                        + " TO_TIMESTAMP('2024-01-01 
00:00:00') <= TO_TIMESTAMP('2024-08-01 00:00:00') as timestamp_equal,"
+                                        + " 
TO_DATE(DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')) >= 
TO_DATE(DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')) as date_equal",
+                                "2 > 1")
+                        .addTimezone("UTC")
+                        .build();
+        RegularEventOperatorTestHarness<PostTransformOperator, Event>
+                transformFunctionEventEventOperatorTestHarness =
+                        RegularEventOperatorTestHarness.with(transform, 1);
+
+        // Initialization
+        transformFunctionEventEventOperatorTestHarness.open();
+        // Create table
+        CreateTableEvent createTableEvent = new 
CreateTableEvent(COMPARE_TABLEID, COMPARE_SCHEMA);
+        BinaryRecordDataGenerator recordDataGenerator =
+                new BinaryRecordDataGenerator(((RowType) 
COMPARE_SCHEMA.toRowDataType()));
+        // Insert
+        DataChangeEvent insertEvent =
+                DataChangeEvent.insertEvent(
+                        COMPARE_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("1"), null, null, 
null, null, null
+                                }));
+        DataChangeEvent insertEventExpect =
+                DataChangeEvent.insertEvent(
+                        COMPARE_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("1"), true, true, 
true, true, true
+                                }));
+        transform.processElement(new StreamRecord<>(createTableEvent));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(
+                        new StreamRecord<>(new 
CreateTableEvent(COMPARE_TABLEID, COMPARE_SCHEMA)));
+        transform.processElement(new StreamRecord<>(insertEvent));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(insertEventExpect));
+    }
+
+    @Test
+    void testCompareErrorTransform() throws Exception {
+        PostTransformOperator transform =
+                PostTransformOperator.newBuilder()
+                        .addTransform(
+                                COMPARE_TABLEID.identifier(),
+                                "col1, 2.1 > TO_TIMESTAMP('2024-01-01 
00:00:00') as numerical_equal,"
+                                        + " '2024-01-01 00:00:00' < 
'2024-08-01 00:00:00' as string_equal,"
+                                        + " LOCALTIME <= CURRENT_TIME as 
time_equal,"
+                                        + " TO_TIMESTAMP('2024-01-01 
00:00:00') <= TO_TIMESTAMP('2024-08-01 00:00:00') as timestamp_equal,"
+                                        + " 
TO_DATE(DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')) >= 
TO_DATE(DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')) as date_equal",
+                                "2 > 1")
+                        .addTimezone("UTC")
+                        .build();
+        RegularEventOperatorTestHarness<PostTransformOperator, Event>
+                transformFunctionEventEventOperatorTestHarness =
+                        RegularEventOperatorTestHarness.with(transform, 1);
+        // Initialization
+        transformFunctionEventEventOperatorTestHarness.open();
+        // Create table
+        CreateTableEvent createTableEvent = new 
CreateTableEvent(COMPARE_TABLEID, COMPARE_SCHEMA);
+        BinaryRecordDataGenerator recordDataGenerator =
+                new BinaryRecordDataGenerator(((RowType) 
COMPARE_SCHEMA.toRowDataType()));
+        // Insert
+        DataChangeEvent insertEvent =
+                DataChangeEvent.insertEvent(
+                        COMPARE_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("1"), null, null, 
null, null, null
+                                }));
+        DataChangeEvent insertEventExpect =
+                DataChangeEvent.insertEvent(
+                        COMPARE_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("1"), true, true, 
true, true, true
+                                }));
+        Assertions.assertThatThrownBy(
+                        () -> {
+                            transform.processElement(new 
StreamRecord<>(createTableEvent));
+                        })
+                .isExactlyInstanceOf(CalciteContextException.class)
+                .hasRootCauseInstanceOf(SqlValidatorException.class)
+                .hasRootCauseMessage(
+                        "Cannot apply '>' to arguments of type '<DECIMAL(2, 
1)> > <TIMESTAMP(3)>'. Supported form(s): '<COMPARABLE_TYPE> > 
<COMPARABLE_TYPE>'");
+    }
+
+    @Test
+    void testCompareDataTransform() throws Exception {
+        PostTransformOperator transform =
+                PostTransformOperator.newBuilder()
+                        .addTransform(
+                                COMPARE_DATA_TABLEID.identifier(),
+                                "id, c1 < 5 as float_equal, c2 > 2.5 as 
double_equal, c3 <= TO_TIMESTAMP('2024-01-01 00:00:00') as timestamp_equal",
+                                null)
+                        .addTimezone("UTC")
+                        .build();
+        RegularEventOperatorTestHarness<PostTransformOperator, Event>
+                transformFunctionEventEventOperatorTestHarness =
+                        RegularEventOperatorTestHarness.with(transform, 1);
+        // Initialization
+        transformFunctionEventEventOperatorTestHarness.open();
+        // Create table
+        CreateTableEvent createTableEvent =
+                new CreateTableEvent(COMPARE_DATA_TABLEID, 
COMPARE_DATA_SCHEMA);
+        BinaryRecordDataGenerator recordDataGenerator =
+                new BinaryRecordDataGenerator(((RowType) 
COMPARE_DATA_SCHEMA.toRowDataType()));
+        BinaryRecordDataGenerator expectRecordDataGenerator =
+                new BinaryRecordDataGenerator(
+                        ((RowType) 
EXPECTD_COMPARE_DATA_SCHEMA.toRowDataType()));
+        // Insert
+        DataChangeEvent insertEvent1 =
+                DataChangeEvent.insertEvent(
+                        COMPARE_DATA_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    1,
+                                    new Float(4f),
+                                    new Double(3.5d),
+                                    TimestampData.fromMillis(1672502400000L)
+                                }));
+        DataChangeEvent insertEventExpect1 =
+                DataChangeEvent.insertEvent(
+                        COMPARE_DATA_TABLEID,
+                        expectRecordDataGenerator.generate(new Object[] {1, 
true, true, true}));
+
+        DataChangeEvent insertEvent2 =
+                DataChangeEvent.insertEvent(
+                        COMPARE_DATA_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    2,
+                                    new Float(10f),
+                                    new Double(0d),
+                                    TimestampData.fromMillis(1730390400000L)
+                                }));
+        DataChangeEvent insertEventExpect2 =
+                DataChangeEvent.insertEvent(
+                        COMPARE_DATA_TABLEID,
+                        expectRecordDataGenerator.generate(new Object[] {2, 
false, false, false}));
+
+        DataChangeEvent insertEvent3 =
+                DataChangeEvent.insertEvent(
+                        COMPARE_DATA_TABLEID,
+                        recordDataGenerator.generate(new Object[] {3, null, 
null, null}));
+        DataChangeEvent insertEventExpect3 =
+                DataChangeEvent.insertEvent(
+                        COMPARE_DATA_TABLEID,
+                        expectRecordDataGenerator.generate(new Object[] {3, 
false, false, false}));
+
+        transform.processElement(new StreamRecord<>(createTableEvent));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(
+                        new StreamRecord<>(
+                                new CreateTableEvent(
+                                        COMPARE_DATA_TABLEID, 
EXPECTD_COMPARE_DATA_SCHEMA)));
+
+        transform.processElement(new StreamRecord<>(insertEvent1));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(insertEventExpect1));
+
+        transform.processElement(new StreamRecord<>(insertEvent2));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(insertEventExpect2));
+
+        transform.processElement(new StreamRecord<>(insertEvent3));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(insertEventExpect3));
+    }
+
     @Test
     void testBuildInFunctionTransform() throws Exception {
         testExpressionConditionTransform(
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
index 478d4e92e..3c5aae6db 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
@@ -263,7 +263,7 @@ public class TransformParserTest {
                 "TO_DATE(dt, 'yyyy-MM-dd')", "toDate(dt, \"yyyy-MM-dd\", 
__time_zone__)");
         testFilterExpression("TO_TIMESTAMP(dt)", "toTimestamp(dt, 
__time_zone__)");
         testFilterExpression("TIMESTAMP_DIFF('DAY', dt1, dt2)", 
"timestampDiff(\"DAY\", dt1, dt2)");
-        testFilterExpression("IF(a>b,a,b)", "a > b ? a : b");
+        testFilterExpression("IF(a>b,a,b)", "greaterThan(a, b) ? a : b");
         testFilterExpression("NULLIF(a,b)", "nullif(a, b)");
         testFilterExpression("COALESCE(a,b,c)", "coalesce(a, b, c)");
         testFilterExpression("id + 2", "id + 2");
@@ -271,17 +271,18 @@ public class TransformParserTest {
         testFilterExpression("id * 2", "id * 2");
         testFilterExpression("id / 2", "id / 2");
         testFilterExpression("id % 2", "id % 2");
-        testFilterExpression("a < b", "a < b");
-        testFilterExpression("a <= b", "a <= b");
-        testFilterExpression("a > b", "a > b");
-        testFilterExpression("a >= b", "a >= b");
+        testFilterExpression("a < b", "lessThan(a, b)");
+        testFilterExpression("a <= b", "lessThanOrEqual(a, b)");
+        testFilterExpression("a > b", "greaterThan(a, b)");
+        testFilterExpression("a >= b", "greaterThanOrEqual(a, b)");
         testFilterExpression("__table_name__ = 'tb'", 
"valueEquals(__table_name__, \"tb\")");
         testFilterExpression("__schema_name__ = 'tb'", 
"valueEquals(__schema_name__, \"tb\")");
         testFilterExpression(
                 "__namespace_name__ = 'tb'", "valueEquals(__namespace_name__, 
\"tb\")");
         testFilterExpression("upper(lower(id))", "upper(lower(id))");
         testFilterExpression(
-                "abs(uniq_id) > 10 and id is not null", "abs(uniq_id) > 10 && 
null != id");
+                "abs(uniq_id) > 10 and id is not null",
+                "greaterThan(abs(uniq_id), 10) && null != id");
         testFilterExpression(
                 "case id when 1 then 'a' when 2 then 'b' else 'c' end",
                 "(valueEquals(id, 1) ? \"a\" : valueEquals(id, 2) ? \"b\" : 
\"c\")");
@@ -474,23 +475,23 @@ public class TransformParserTest {
                 "typeof(id % 2)", "__instanceOfTypeOfFunctionClass.eval(id % 
2)");
         testFilterExpressionWithUdf(
                 "addone(addone(id)) > 4 OR typeof(id) <> 'bool' AND 
format('from %s to %s is %s', 'a', 'z', 'lie') <> ''",
-                
"__instanceOfAddOneFunctionClass.eval(__instanceOfAddOneFunctionClass.eval(id)) 
> 4 || !valueEquals(__instanceOfTypeOfFunctionClass.eval(id), \"bool\") && 
!valueEquals(__instanceOfFormatFunctionClass.eval(\"from %s to %s is %s\", 
\"a\", \"z\", \"lie\"), \"\")");
+                
"greaterThan(__instanceOfAddOneFunctionClass.eval(__instanceOfAddOneFunctionClass.eval(id)),
 4) || !valueEquals(__instanceOfTypeOfFunctionClass.eval(id), \"bool\") && 
!valueEquals(__instanceOfFormatFunctionClass.eval(\"from %s to %s is %s\", 
\"a\", \"z\", \"lie\"), \"\")");
         testFilterExpressionWithUdf(
                 "ADDONE(ADDONE(id)) > 4 OR TYPEOF(id) <> 'bool' AND 
FORMAT('from %s to %s is %s', 'a', 'z', 'lie') <> ''",
-                
"__instanceOfAddOneFunctionClass.eval(__instanceOfAddOneFunctionClass.eval(id)) 
> 4 || !valueEquals(__instanceOfTypeOfFunctionClass.eval(id), \"bool\") && 
!valueEquals(__instanceOfFormatFunctionClass.eval(\"from %s to %s is %s\", 
\"a\", \"z\", \"lie\"), \"\")");
+                
"greaterThan(__instanceOfAddOneFunctionClass.eval(__instanceOfAddOneFunctionClass.eval(id)),
 4) || !valueEquals(__instanceOfTypeOfFunctionClass.eval(id), \"bool\") && 
!valueEquals(__instanceOfFormatFunctionClass.eval(\"from %s to %s is %s\", 
\"a\", \"z\", \"lie\"), \"\")");
     }
 
     @Test
     void testLargeNumericalLiterals() {
         // For literals within [-2147483648, 2147483647] range, plain Integers 
are OK
-        testFilterExpression("id > 2147483647", "id > 2147483647");
-        testFilterExpression("id < -2147483648", "id < -2147483648");
+        testFilterExpression("id > 2147483647", "greaterThan(id, 2147483647)");
+        testFilterExpression("id < -2147483648", "lessThan(id, -2147483648)");
 
         // For out-of-range literals, an extra `L` suffix is required
-        testFilterExpression("id > 2147483648", "id > 2147483648L");
-        testFilterExpression("id > -2147483649", "id > -2147483649L");
-        testFilterExpression("id < 9223372036854775807", "id < 
9223372036854775807L");
-        testFilterExpression("id > -9223372036854775808", "id > 
-9223372036854775808L");
+        testFilterExpression("id > 2147483648", "greaterThan(id, 
2147483648L)");
+        testFilterExpression("id > -2147483649", "greaterThan(id, 
-2147483649L)");
+        testFilterExpression("id < 9223372036854775807", "lessThan(id, 
9223372036854775807L)");
+        testFilterExpression("id > -9223372036854775808", "greaterThan(id, 
-9223372036854775808L)");
 
         // But there's still a limit
         Assertions.assertThatThrownBy(

Reply via email to