yuxiqian commented on code in PR #3698:
URL: https://github.com/apache/flink-cdc/pull/3698#discussion_r1899933072


##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java:
##########
@@ -184,6 +231,53 @@ public static int timestampDiff(String symbol, long 
fromDate, long toDate) {
         }
     }
 
+    public static TimestampData timestampadd(
+            String timeintervalunit, int interval, LocalZonedTimestampData 
timepoint) {
+        return timestampadd(timeintervalunit, interval, 
timepoint.getEpochMillisecond());
+    }
+
+    public static TimestampData timestampadd(
+            String timeintervalunit, int interval, ZonedTimestampData 
timepoint) {
+        return timestampadd(timeintervalunit, interval, 
timepoint.getMillisecond());
+    }
+
+    public static TimestampData timestampadd(
+            String timeintervalunit, int interval, TimestampData timepoint) {
+        return timestampadd(timeintervalunit, interval, 
timepoint.getMillisecond());
+    }
+
+    public static TimestampData timestampadd(
+            String timeintervalunit, int interval, long timepoint) {
+        Calendar calendar = Calendar.getInstance();
+        calendar.setTime(new Date(timepoint));
+        int field;
+        switch (timeintervalunit) {
+            case "SECOND":
+                field = Calendar.SECOND;
+                break;
+            case "MINUTE":
+                field = Calendar.MINUTE;
+                break;
+            case "HOUR":
+                field = Calendar.HOUR;
+                break;
+            case "DAY":
+                field = Calendar.DAY_OF_YEAR;
+                break;
+            case "MONTH":
+                field = Calendar.MONTH;
+                break;
+            case "YEAR":
+                field = Calendar.YEAR;
+                break;
+            default:
+                LOG.error("Unsupported timestamp add: {}", timeintervalunit);

Review Comment:
   I think it's not very meaningful to log here since it was evaluated in 
Janino engine.



##########
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java:
##########
@@ -732,12 +831,107 @@ void testTimestampDiffTransform() throws Exception {
                 DataChangeEvent.insertEvent(
                         TIMESTAMPDIFF_TABLEID,
                         recordDataGenerator.generate(
-                                new Object[] {new BinaryStringData("2"), null, 
null, null, null}));
+                                new Object[] {
+                                    new BinaryStringData("2"), null, null, 
null, null, null, null
+                                }));
         DataChangeEvent insertEventExpect2 =
                 DataChangeEvent.insertEvent(
                         TIMESTAMPDIFF_TABLEID,
                         recordDataGenerator.generate(
-                                new Object[] {new BinaryStringData("2"), 0, 0, 
0, 0}));
+                                new Object[] {new BinaryStringData("2"), 0, 0, 
0, 0, 0, 0}));
+
+        transform.processElement(new StreamRecord<>(insertEvent2));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(insertEventExpect2));
+    }
+
+    @Test
+    void testTimestampaddTransform() throws Exception {
+        PostTransformOperator transform =
+                PostTransformOperator.newBuilder()
+                        .addTransform(
+                                TIMESTAMPADD_TABLEID.identifier(),
+                                "col1, DATE_FORMAT(TIMESTAMPADD(SECOND, 1, 
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as second_add,"
+                                        + " DATE_FORMAT(TIMESTAMPADD(MINUTE, 
1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as minute_add,"
+                                        + " DATE_FORMAT(TIMESTAMPADD(HOUR, 1, 
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as hour_add,"
+                                        + " DATE_FORMAT(TIMESTAMPADD(DAY, 1, 
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as day_add,"
+                                        + " DATE_FORMAT(TIMESTAMPADD(MONTH, 1, 
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as month_add,"
+                                        + " DATE_FORMAT(TIMESTAMPADD(YEAR, 1, 
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as year_add",
+                                "col1='1'")
+                        .addTransform(
+                                TIMESTAMPADD_TABLEID.identifier(),
+                                "col1, DATE_FORMAT(TIMESTAMPADD(SECOND, -1, 
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as second_add,"
+                                        + " DATE_FORMAT(TIMESTAMPADD(MINUTE, 
-1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as minute_add,"
+                                        + " DATE_FORMAT(TIMESTAMPADD(HOUR, -1, 
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as hour_add,"
+                                        + " DATE_FORMAT(TIMESTAMPADD(DAY, -1, 
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as day_add,"
+                                        + " DATE_FORMAT(TIMESTAMPADD(MONTH, 
-1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as month_add,"
+                                        + " DATE_FORMAT(TIMESTAMPADD(YEAR, -1, 
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as year_add",
+                                "col1='2'")
+                        .addTimezone("UTC")
+                        .build();
+        EventOperatorTestHarness<PostTransformOperator, Event>
+                transformFunctionEventEventOperatorTestHarness =
+                        new EventOperatorTestHarness<>(transform, 1);
+        // Initialization
+        transformFunctionEventEventOperatorTestHarness.open();
+        // Create table
+        CreateTableEvent createTableEvent =
+                new CreateTableEvent(TIMESTAMPADD_TABLEID, 
TIMESTAMPADD_SCHEMA);
+        BinaryRecordDataGenerator recordDataGenerator =
+                new BinaryRecordDataGenerator(((RowType) 
TIMESTAMPADD_SCHEMA.toRowDataType()));
+        // Insert
+        DataChangeEvent insertEvent =
+                DataChangeEvent.insertEvent(
+                        TIMESTAMPADD_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("1"), null, null, 
null, null, null, null
+                                }));
+        DataChangeEvent insertEventExpect =
+                DataChangeEvent.insertEvent(
+                        TIMESTAMPADD_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("1"),
+                                    new BinaryStringData("2024-10-01 
00:00:01"),
+                                    new BinaryStringData("2024-10-01 
00:01:00"),
+                                    new BinaryStringData("2024-10-01 
01:00:00"),
+                                    new BinaryStringData("2024-10-02 
00:00:00"),
+                                    new BinaryStringData("2024-11-01 
00:00:00"),
+                                    new BinaryStringData("2025-10-01 00:00:00")
+                                }));
+        transform.processElement(new StreamRecord<>(createTableEvent));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(
+                        new StreamRecord<>(
+                                new CreateTableEvent(TIMESTAMPADD_TABLEID, 
TIMESTAMPADD_SCHEMA)));
+        transform.processElement(new StreamRecord<>(insertEvent));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(insertEventExpect));
+
+        DataChangeEvent insertEvent2 =

Review Comment:
   Currently, all test data are hard-encoded inside, and it's not easy to 
extend it to cover more cases. I wonder if organizing test cases in this way 
would be more effective?
   
   * Input schema is columns in TIMESTAMP, TIMESTAMP_TZ, and TIMESTAMP_LTZ
   * Append multiple computed rows to utilize newly-added functions in 
transform rules
   * Insert multiple rows (with some extreme cases, like possibly out-of-range 
cases)
   
   For example, our input schema might be `TIMESTAMP ts`, and transform rule 
could be `TIMESTAMPADD(DAY, 1, ts) AS one_day_later, ...`.
   
   Now, after inserting original data `2020-01-01` and `2023-12-31`, we may get 
`2020-01-02` and `2024-01-01` as expected results.
   
   Anyway, it's just an example. Please consider adding more corner use cases 
here.



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java:
##########
@@ -184,6 +231,53 @@ public static int timestampDiff(String symbol, long 
fromDate, long toDate) {
         }
     }
 
+    public static TimestampData timestampadd(
+            String timeintervalunit, int interval, LocalZonedTimestampData 
timepoint) {
+        return timestampadd(timeintervalunit, interval, 
timepoint.getEpochMillisecond());
+    }
+
+    public static TimestampData timestampadd(
+            String timeintervalunit, int interval, ZonedTimestampData 
timepoint) {
+        return timestampadd(timeintervalunit, interval, 
timepoint.getMillisecond());
+    }
+
+    public static TimestampData timestampadd(
+            String timeintervalunit, int interval, TimestampData timepoint) {
+        return timestampadd(timeintervalunit, interval, 
timepoint.getMillisecond());
+    }
+
+    public static TimestampData timestampadd(
+            String timeintervalunit, int interval, long timepoint) {
+        Calendar calendar = Calendar.getInstance();
+        calendar.setTime(new Date(timepoint));
+        int field;
+        switch (timeintervalunit) {
+            case "SECOND":
+                field = Calendar.SECOND;
+                break;
+            case "MINUTE":
+                field = Calendar.MINUTE;
+                break;
+            case "HOUR":
+                field = Calendar.HOUR;
+                break;
+            case "DAY":
+                field = Calendar.DAY_OF_YEAR;
+                break;
+            case "MONTH":
+                field = Calendar.MONTH;
+                break;
+            case "YEAR":
+                field = Calendar.YEAR;
+                break;
+            default:
+                LOG.error("Unsupported timestamp add: {}", timeintervalunit);
+                throw new RuntimeException("Unsupported timestamp add: " + 
timeintervalunit);

Review Comment:
   "Unsupported timestamp interval unit. Supported types are: SECOND, MINUTE, 
HOUR, ..."



##########
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java:
##########
@@ -249,6 +249,8 @@ public void testTranslateFilterToJaninoExpression() {
                 "TO_DATE(dt, 'yyyy-MM-dd')", "toDate(dt, \"yyyy-MM-dd\", 
__time_zone__)");
         testFilterExpression("TO_TIMESTAMP(dt)", "toTimestamp(dt, 
__time_zone__)");
         testFilterExpression("TIMESTAMP_DIFF('DAY', dt1, dt2)", 
"timestampDiff(\"DAY\", dt1, dt2)");
+        testFilterExpression("TIMESTAMPDIFF(DAY, dt1, dt2)", 
"timestampdiff(\"DAY\", dt1, dt2)");
+        testFilterExpression("TIMESTAMPADD(DAY, 1, dt)", 
"timestampadd(\"DAY\", 1, dt)");

Review Comment:
   Consider add cases for all valid timestamp units (and invalid ones).



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java:
##########
@@ -184,6 +231,53 @@ public static int timestampDiff(String symbol, long 
fromDate, long toDate) {
         }
     }
 
+    public static TimestampData timestampadd(
+            String timeintervalunit, int interval, LocalZonedTimestampData 
timepoint) {
+        return timestampadd(timeintervalunit, interval, 
timepoint.getEpochMillisecond());
+    }
+
+    public static TimestampData timestampadd(
+            String timeintervalunit, int interval, ZonedTimestampData 
timepoint) {
+        return timestampadd(timeintervalunit, interval, 
timepoint.getMillisecond());
+    }
+
+    public static TimestampData timestampadd(
+            String timeintervalunit, int interval, TimestampData timepoint) {
+        return timestampadd(timeintervalunit, interval, 
timepoint.getMillisecond());
+    }
+
+    public static TimestampData timestampadd(
+            String timeintervalunit, int interval, long timepoint) {
+        Calendar calendar = Calendar.getInstance();
+        calendar.setTime(new Date(timepoint));
+        int field;
+        switch (timeintervalunit) {

Review Comment:
   Maybe we can downcase the unit first?



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java:
##########
@@ -184,6 +231,53 @@ public static int timestampDiff(String symbol, long 
fromDate, long toDate) {
         }
     }
 
+    public static TimestampData timestampadd(
+            String timeintervalunit, int interval, LocalZonedTimestampData 
timepoint) {
+        return timestampadd(timeintervalunit, interval, 
timepoint.getEpochMillisecond());
+    }
+
+    public static TimestampData timestampadd(
+            String timeintervalunit, int interval, ZonedTimestampData 
timepoint) {
+        return timestampadd(timeintervalunit, interval, 
timepoint.getMillisecond());
+    }
+
+    public static TimestampData timestampadd(
+            String timeintervalunit, int interval, TimestampData timepoint) {
+        return timestampadd(timeintervalunit, interval, 
timepoint.getMillisecond());
+    }
+
+    public static TimestampData timestampadd(
+            String timeintervalunit, int interval, long timepoint) {

Review Comment:
   Use camel-case names for parameters, like `timeIntervalUnit`. Please also 
check other newly-added functions' parameters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to