yuxiqian commented on code in PR #3698:
URL: https://github.com/apache/flink-cdc/pull/3698#discussion_r1913993565


##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java:
##########
@@ -108,83 +108,203 @@ public static TimestampData toTimestamp(String str, 
String format, String timezo
         }
     }
 
-    public static int timestampDiff(
-            String symbol,
+    public static Integer timestampDiff(
+            String timeIntervalUnit,
             LocalZonedTimestampData fromTimestamp,
-            LocalZonedTimestampData toTimestamp) {
+            LocalZonedTimestampData toTimestamp,
+            String timezone) {
+        if (fromTimestamp == null || toTimestamp == null) {
+            return null;
+        }
         return timestampDiff(
-                symbol, fromTimestamp.getEpochMillisecond(), 
toTimestamp.getEpochMillisecond());
+                timeIntervalUnit,
+                fromTimestamp.getEpochMillisecond(),
+                timezone,
+                toTimestamp.getEpochMillisecond(),
+                timezone);
     }
 
-    public static int timestampDiff(
-            String symbol, TimestampData fromTimestamp, TimestampData 
toTimestamp) {
-        return timestampDiff(symbol, fromTimestamp.getMillisecond(), 
toTimestamp.getMillisecond());
+    public static Integer timestampdiff(
+            String timeIntervalUnit,
+            LocalZonedTimestampData fromTimestamp,
+            LocalZonedTimestampData toTimestamp,
+            String timezone) {
+        return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, 
timezone);
     }
 
-    public static int timestampDiff(
-            String symbol, TimestampData fromTimestamp, 
LocalZonedTimestampData toTimestamp) {
+    public static Integer timestampDiff(
+            String timeIntervalUnit,
+            TimestampData fromTimestamp,
+            TimestampData toTimestamp,
+            String timezone) {
+        if (fromTimestamp == null || toTimestamp == null) {
+            return null;
+        }
         return timestampDiff(
-                symbol, fromTimestamp.getMillisecond(), 
toTimestamp.getEpochMillisecond());
-    }
-
-    public static int timestampDiff(
-            String symbol, LocalZonedTimestampData fromTimestamp, 
TimestampData toTimestamp) {
+                timeIntervalUnit,
+                fromTimestamp.getMillisecond(),
+                "UTC",
+                toTimestamp.getMillisecond(),
+                "UTC");
+    }
+
+    public static Integer timestampdiff(
+            String timeIntervalUnit,
+            TimestampData fromTimestamp,
+            TimestampData toTimestamp,
+            String timezone) {
+        return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, 
timezone);
+    }
+
+    public static Integer timestampDiff(
+            String timeIntervalUnit,
+            TimestampData fromTimestamp,
+            LocalZonedTimestampData toTimestamp,
+            String timezone) {
+        if (fromTimestamp == null || toTimestamp == null) {
+            return null;
+        }
         return timestampDiff(
-                symbol, fromTimestamp.getEpochMillisecond(), 
toTimestamp.getMillisecond());
+                timeIntervalUnit,
+                fromTimestamp.getMillisecond(),
+                "UTC",
+                toTimestamp.getEpochMillisecond(),
+                timezone);
     }
 
-    public static int timestampDiff(
-            String symbol, ZonedTimestampData fromTimestamp, 
ZonedTimestampData toTimestamp) {
-        return timestampDiff(symbol, fromTimestamp.getMillisecond(), 
toTimestamp.getMillisecond());
+    public static Integer timestampdiff(
+            String timeIntervalUnit,
+            TimestampData fromTimestamp,
+            LocalZonedTimestampData toTimestamp,
+            String timezone) {
+        return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, 
timezone);
     }
 
-    public static int timestampDiff(
-            String symbol, LocalZonedTimestampData fromTimestamp, 
ZonedTimestampData toTimestamp) {
+    public static Integer timestampDiff(
+            String timeIntervalUnit,
+            LocalZonedTimestampData fromTimestamp,
+            TimestampData toTimestamp,
+            String timezone) {
+        if (fromTimestamp == null || toTimestamp == null) {
+            return null;
+        }
         return timestampDiff(
-                symbol, fromTimestamp.getEpochMillisecond(), 
toTimestamp.getMillisecond());
+                timeIntervalUnit,
+                fromTimestamp.getEpochMillisecond(),
+                timezone,
+                toTimestamp.getMillisecond(),
+                "UTC");
     }
 
-    public static int timestampDiff(
-            String symbol, ZonedTimestampData fromTimestamp, 
LocalZonedTimestampData toTimestamp) {
-        return timestampDiff(
-                symbol, fromTimestamp.getMillisecond(), 
toTimestamp.getEpochMillisecond());
+    public static Integer timestampdiff(
+            String timeIntervalUnit,
+            LocalZonedTimestampData fromTimestamp,
+            TimestampData toTimestamp,
+            String timezone) {
+        return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, 
timezone);
+    }
+
+    public static Integer timestampDiff(

Review Comment:
   nit: Will this method be directly invoked by generated expressions? If not, 
maybe we can mark it `private` and change its name (or move it to 
`DateTimeUtils`) to avoid adding another overloading versions into it.



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/DataTypeConverter.java:
##########
@@ -129,6 +132,8 @@ public static RelDataType convertCalciteType(
                 return 
typeFactory.createSqlType(SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE);
             case TIMESTAMP_WITHOUT_TIME_ZONE:
                 return typeFactory.createSqlType(SqlTypeName.TIMESTAMP);
+            case TIMESTAMP_WITH_TIME_ZONE:

Review Comment:
   nit: maybe add a TODO comment with JIRA ticket here



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java:
##########
@@ -108,83 +108,203 @@ public static TimestampData toTimestamp(String str, 
String format, String timezo
         }
     }
 
-    public static int timestampDiff(
-            String symbol,
+    public static Integer timestampDiff(
+            String timeIntervalUnit,
             LocalZonedTimestampData fromTimestamp,
-            LocalZonedTimestampData toTimestamp) {
+            LocalZonedTimestampData toTimestamp,
+            String timezone) {
+        if (fromTimestamp == null || toTimestamp == null) {
+            return null;
+        }
         return timestampDiff(
-                symbol, fromTimestamp.getEpochMillisecond(), 
toTimestamp.getEpochMillisecond());
+                timeIntervalUnit,
+                fromTimestamp.getEpochMillisecond(),
+                timezone,
+                toTimestamp.getEpochMillisecond(),
+                timezone);
     }
 
-    public static int timestampDiff(
-            String symbol, TimestampData fromTimestamp, TimestampData 
toTimestamp) {
-        return timestampDiff(symbol, fromTimestamp.getMillisecond(), 
toTimestamp.getMillisecond());
+    public static Integer timestampdiff(
+            String timeIntervalUnit,
+            LocalZonedTimestampData fromTimestamp,
+            LocalZonedTimestampData toTimestamp,
+            String timezone) {
+        return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, 
timezone);
     }
 
-    public static int timestampDiff(
-            String symbol, TimestampData fromTimestamp, 
LocalZonedTimestampData toTimestamp) {
+    public static Integer timestampDiff(
+            String timeIntervalUnit,
+            TimestampData fromTimestamp,
+            TimestampData toTimestamp,
+            String timezone) {
+        if (fromTimestamp == null || toTimestamp == null) {
+            return null;
+        }
         return timestampDiff(
-                symbol, fromTimestamp.getMillisecond(), 
toTimestamp.getEpochMillisecond());
-    }
-
-    public static int timestampDiff(
-            String symbol, LocalZonedTimestampData fromTimestamp, 
TimestampData toTimestamp) {
+                timeIntervalUnit,
+                fromTimestamp.getMillisecond(),
+                "UTC",
+                toTimestamp.getMillisecond(),
+                "UTC");
+    }
+
+    public static Integer timestampdiff(
+            String timeIntervalUnit,
+            TimestampData fromTimestamp,
+            TimestampData toTimestamp,
+            String timezone) {
+        return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, 
timezone);
+    }
+
+    public static Integer timestampDiff(
+            String timeIntervalUnit,
+            TimestampData fromTimestamp,
+            LocalZonedTimestampData toTimestamp,
+            String timezone) {
+        if (fromTimestamp == null || toTimestamp == null) {
+            return null;
+        }
         return timestampDiff(
-                symbol, fromTimestamp.getEpochMillisecond(), 
toTimestamp.getMillisecond());
+                timeIntervalUnit,
+                fromTimestamp.getMillisecond(),
+                "UTC",
+                toTimestamp.getEpochMillisecond(),
+                timezone);
     }
 
-    public static int timestampDiff(
-            String symbol, ZonedTimestampData fromTimestamp, 
ZonedTimestampData toTimestamp) {
-        return timestampDiff(symbol, fromTimestamp.getMillisecond(), 
toTimestamp.getMillisecond());
+    public static Integer timestampdiff(
+            String timeIntervalUnit,
+            TimestampData fromTimestamp,
+            LocalZonedTimestampData toTimestamp,
+            String timezone) {
+        return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, 
timezone);
     }

Review Comment:
   suggestion: since `timestampdiff` is merely a wrapper of `timestampDiff`, we 
can put proxy methods in a single place and add comments.



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java:
##########
@@ -108,83 +108,203 @@ public static TimestampData toTimestamp(String str, 
String format, String timezo
         }
     }
 
-    public static int timestampDiff(
-            String symbol,
+    public static Integer timestampDiff(
+            String timeIntervalUnit,
             LocalZonedTimestampData fromTimestamp,
-            LocalZonedTimestampData toTimestamp) {
+            LocalZonedTimestampData toTimestamp,
+            String timezone) {
+        if (fromTimestamp == null || toTimestamp == null) {
+            return null;
+        }
         return timestampDiff(
-                symbol, fromTimestamp.getEpochMillisecond(), 
toTimestamp.getEpochMillisecond());
+                timeIntervalUnit,
+                fromTimestamp.getEpochMillisecond(),
+                timezone,
+                toTimestamp.getEpochMillisecond(),
+                timezone);
     }
 
-    public static int timestampDiff(
-            String symbol, TimestampData fromTimestamp, TimestampData 
toTimestamp) {
-        return timestampDiff(symbol, fromTimestamp.getMillisecond(), 
toTimestamp.getMillisecond());
+    public static Integer timestampdiff(
+            String timeIntervalUnit,
+            LocalZonedTimestampData fromTimestamp,
+            LocalZonedTimestampData toTimestamp,
+            String timezone) {
+        return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, 
timezone);
     }
 
-    public static int timestampDiff(
-            String symbol, TimestampData fromTimestamp, 
LocalZonedTimestampData toTimestamp) {
+    public static Integer timestampDiff(
+            String timeIntervalUnit,
+            TimestampData fromTimestamp,
+            TimestampData toTimestamp,
+            String timezone) {
+        if (fromTimestamp == null || toTimestamp == null) {
+            return null;
+        }
         return timestampDiff(
-                symbol, fromTimestamp.getMillisecond(), 
toTimestamp.getEpochMillisecond());
-    }
-
-    public static int timestampDiff(
-            String symbol, LocalZonedTimestampData fromTimestamp, 
TimestampData toTimestamp) {
+                timeIntervalUnit,
+                fromTimestamp.getMillisecond(),
+                "UTC",
+                toTimestamp.getMillisecond(),
+                "UTC");
+    }
+
+    public static Integer timestampdiff(
+            String timeIntervalUnit,
+            TimestampData fromTimestamp,
+            TimestampData toTimestamp,
+            String timezone) {
+        return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, 
timezone);
+    }
+
+    public static Integer timestampDiff(
+            String timeIntervalUnit,
+            TimestampData fromTimestamp,
+            LocalZonedTimestampData toTimestamp,
+            String timezone) {
+        if (fromTimestamp == null || toTimestamp == null) {
+            return null;
+        }
         return timestampDiff(
-                symbol, fromTimestamp.getEpochMillisecond(), 
toTimestamp.getMillisecond());
+                timeIntervalUnit,
+                fromTimestamp.getMillisecond(),
+                "UTC",
+                toTimestamp.getEpochMillisecond(),
+                timezone);
     }
 
-    public static int timestampDiff(
-            String symbol, ZonedTimestampData fromTimestamp, 
ZonedTimestampData toTimestamp) {
-        return timestampDiff(symbol, fromTimestamp.getMillisecond(), 
toTimestamp.getMillisecond());
+    public static Integer timestampdiff(
+            String timeIntervalUnit,
+            TimestampData fromTimestamp,
+            LocalZonedTimestampData toTimestamp,
+            String timezone) {
+        return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, 
timezone);
     }
 
-    public static int timestampDiff(
-            String symbol, LocalZonedTimestampData fromTimestamp, 
ZonedTimestampData toTimestamp) {
+    public static Integer timestampDiff(
+            String timeIntervalUnit,
+            LocalZonedTimestampData fromTimestamp,
+            TimestampData toTimestamp,
+            String timezone) {
+        if (fromTimestamp == null || toTimestamp == null) {
+            return null;
+        }
         return timestampDiff(
-                symbol, fromTimestamp.getEpochMillisecond(), 
toTimestamp.getMillisecond());
+                timeIntervalUnit,
+                fromTimestamp.getEpochMillisecond(),
+                timezone,
+                toTimestamp.getMillisecond(),
+                "UTC");
     }
 
-    public static int timestampDiff(
-            String symbol, ZonedTimestampData fromTimestamp, 
LocalZonedTimestampData toTimestamp) {
-        return timestampDiff(
-                symbol, fromTimestamp.getMillisecond(), 
toTimestamp.getEpochMillisecond());
+    public static Integer timestampdiff(
+            String timeIntervalUnit,
+            LocalZonedTimestampData fromTimestamp,
+            TimestampData toTimestamp,
+            String timezone) {
+        return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp, 
timezone);
+    }
+
+    public static Integer timestampDiff(
+            String timeIntervalUnit,
+            long fromDate,
+            String fromTimezone,
+            long toDate,
+            String toTimezone) {
+        Calendar from = 
Calendar.getInstance(TimeZone.getTimeZone(fromTimezone));
+        from.setTime(new Date(fromDate));
+        Calendar to = Calendar.getInstance(TimeZone.getTimeZone(toTimezone));
+        to.setTime(new Date(toDate));
+        long second = (to.getTimeInMillis() - from.getTimeInMillis()) / 1000;
+        switch (timeIntervalUnit) {
+            case "SECOND":
+                if (second > Integer.MAX_VALUE) {
+                    return null;
+                }
+                return (int) second;
+            case "MINUTE":
+                if (second > Integer.MAX_VALUE) {
+                    return null;
+                }
+                return (int) second / 60;
+            case "HOUR":
+                if (second > Integer.MAX_VALUE) {
+                    return null;
+                }
+                return (int) second / 3600;
+            case "DAY":
+                if (second > Integer.MAX_VALUE) {
+                    return null;
+                }
+                return (int) second / (24 * 3600);
+            case "MONTH":
+                return to.get(Calendar.YEAR) * 12
+                        + to.get(Calendar.MONTH)
+                        - (from.get(Calendar.YEAR) * 12 + 
from.get(Calendar.MONTH));
+            case "YEAR":
+                return to.get(Calendar.YEAR) - from.get(Calendar.YEAR);
+            default:
+                throw new RuntimeException(
+                        String.format(
+                                "Unsupported timestamp interval unit %s. 
Supported units are: SECOND, MINUTE, HOUR, DAY, MONTH, YEAR",
+                                timeIntervalUnit));
+        }
     }
 
-    public static int timestampDiff(
-            String symbol, TimestampData fromTimestamp, ZonedTimestampData 
toTimestamp) {
-        return timestampDiff(symbol, fromTimestamp.getMillisecond(), 
toTimestamp.getMillisecond());
+    public static LocalZonedTimestampData timestampadd(
+            String timeIntervalUnit,
+            Integer interval,
+            LocalZonedTimestampData timePoint,
+            String timezone) {
+        if (interval == null || timePoint == null) {
+            return null;
+        }
+        return LocalZonedTimestampData.fromEpochMillis(
+                timestampadd(
+                        timeIntervalUnit, interval, 
timePoint.getEpochMillisecond(), timezone));
     }
 
-    public static int timestampDiff(
-            String symbol, ZonedTimestampData fromTimestamp, TimestampData 
toTimestamp) {
-        return timestampDiff(symbol, fromTimestamp.getMillisecond(), 
toTimestamp.getMillisecond());
+    public static TimestampData timestampadd(
+            String timeIntervalUnit, Integer interval, TimestampData 
timePoint, String timezone) {
+        if (interval == null || timePoint == null) {
+            return null;
+        }
+        return TimestampData.fromMillis(
+                timestampadd(timeIntervalUnit, interval, 
timePoint.getMillisecond(), "UTC"));
     }
 
-    public static int timestampDiff(String symbol, long fromDate, long toDate) 
{
-        Calendar from = Calendar.getInstance();
-        from.setTime(new Date(fromDate));
-        Calendar to = Calendar.getInstance();
-        to.setTime(new Date(toDate));
-        Long second = (to.getTimeInMillis() - from.getTimeInMillis()) / 1000;
-        switch (symbol) {
+    private static long timestampadd(
+            String timeIntervalUnit, int interval, long timePoint, String 
timezone) {
+        Calendar calendar = Calendar.getInstance();
+        calendar.setTimeZone(TimeZone.getTimeZone(timezone));
+        calendar.setTime(new Date(timePoint));
+        int field;
+        switch (timeIntervalUnit) {
             case "SECOND":
-                return second.intValue();
+                field = Calendar.SECOND;
+                break;
             case "MINUTE":
-                return second.intValue() / 60;
+                field = Calendar.MINUTE;
+                break;
             case "HOUR":
-                return second.intValue() / 3600;
+                field = Calendar.HOUR;
+                break;
             case "DAY":
-                return second.intValue() / (24 * 3600);
+                field = Calendar.DAY_OF_YEAR;
+                break;

Review Comment:
   Seems `DAY_OF_MONTH`, `DAY_OF_YEAR`, `DAY_OF_WEEK` are all synonyms of 
`Calendar.DATE` when calling `Calendar#add`. Maybe just use `DATE` for clarity?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to