yuxiqian commented on code in PR #3698:
URL: https://github.com/apache/flink-cdc/pull/3698#discussion_r1906325845
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java:
##########
@@ -108,83 +108,305 @@ public static TimestampData toTimestamp(String str,
String format, String timezo
}
}
- public static int timestampDiff(
- String symbol,
+ public static Integer timestampDiff(
+ String timeIntervalUnit,
LocalZonedTimestampData fromTimestamp,
LocalZonedTimestampData toTimestamp) {
+ if (fromTimestamp == null || toTimestamp == null) {
+ return null;
+ }
return timestampDiff(
- symbol, fromTimestamp.getEpochMillisecond(),
toTimestamp.getEpochMillisecond());
+ timeIntervalUnit,
+ fromTimestamp.getEpochMillisecond(),
+ ZoneId.systemDefault().getId(),
+ toTimestamp.getEpochMillisecond(),
+ ZoneId.systemDefault().getId());
}
- public static int timestampDiff(
- String symbol, TimestampData fromTimestamp, TimestampData
toTimestamp) {
- return timestampDiff(symbol, fromTimestamp.getMillisecond(),
toTimestamp.getMillisecond());
+ public static Integer timestampdiff(
+ String timeIntervalUnit,
+ LocalZonedTimestampData fromTimestamp,
+ LocalZonedTimestampData toTimestamp) {
+ return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
}
- public static int timestampDiff(
- String symbol, TimestampData fromTimestamp,
LocalZonedTimestampData toTimestamp) {
+ public static Integer timestampDiff(
+ String timeIntervalUnit, TimestampData fromTimestamp,
TimestampData toTimestamp) {
+ if (fromTimestamp == null || toTimestamp == null) {
+ return null;
+ }
return timestampDiff(
- symbol, fromTimestamp.getMillisecond(),
toTimestamp.getEpochMillisecond());
+ timeIntervalUnit,
+ fromTimestamp.getMillisecond(),
+ null,
+ toTimestamp.getMillisecond(),
+ null);
+ }
+
+ public static Integer timestampdiff(
+ String timeIntervalUnit, TimestampData fromTimestamp,
TimestampData toTimestamp) {
+ return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
}
- public static int timestampDiff(
- String symbol, LocalZonedTimestampData fromTimestamp,
TimestampData toTimestamp) {
+ public static Integer timestampDiff(
+ String timeIntervalUnit,
+ TimestampData fromTimestamp,
+ LocalZonedTimestampData toTimestamp) {
+ if (fromTimestamp == null || toTimestamp == null) {
+ return null;
+ }
return timestampDiff(
- symbol, fromTimestamp.getEpochMillisecond(),
toTimestamp.getMillisecond());
+ timeIntervalUnit,
+ fromTimestamp.getMillisecond(),
+ null,
+ toTimestamp.getEpochMillisecond(),
+ ZoneId.systemDefault().getId());
}
- public static int timestampDiff(
- String symbol, ZonedTimestampData fromTimestamp,
ZonedTimestampData toTimestamp) {
- return timestampDiff(symbol, fromTimestamp.getMillisecond(),
toTimestamp.getMillisecond());
+ public static Integer timestampdiff(
+ String timeIntervalUnit,
+ TimestampData fromTimestamp,
+ LocalZonedTimestampData toTimestamp) {
+ return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
+ }
+
+ public static Integer timestampDiff(
+ String timeIntervalUnit,
+ LocalZonedTimestampData fromTimestamp,
+ TimestampData toTimestamp) {
+ if (fromTimestamp == null || toTimestamp == null) {
+ return null;
+ }
+ return timestampDiff(
+ timeIntervalUnit,
+ fromTimestamp.getEpochMillisecond(),
+ ZoneId.systemDefault().getId(),
+ toTimestamp.getMillisecond(),
+ null);
+ }
+
+ public static Integer timestampdiff(
+ String timeIntervalUnit,
+ LocalZonedTimestampData fromTimestamp,
+ TimestampData toTimestamp) {
+ return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
+ }
+
+ public static Integer timestampDiff(
+ String timeIntervalUnit,
+ ZonedTimestampData fromTimestamp,
+ ZonedTimestampData toTimestamp) {
+ if (fromTimestamp == null || toTimestamp == null) {
+ return null;
+ }
+ return timestampDiff(
+ timeIntervalUnit,
+ fromTimestamp.getMillisecond(),
+ fromTimestamp.getZoneId(),
+ toTimestamp.getMillisecond(),
+ toTimestamp.getZoneId());
+ }
+
+ public static Integer timestampdiff(
+ String timeIntervalUnit,
+ ZonedTimestampData fromTimestamp,
+ ZonedTimestampData toTimestamp) {
+ return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
+ }
+
+ public static Integer timestampDiff(
+ String timeIntervalUnit,
+ LocalZonedTimestampData fromTimestamp,
+ ZonedTimestampData toTimestamp) {
+ if (fromTimestamp == null || toTimestamp == null) {
+ return null;
+ }
+ return timestampDiff(
+ timeIntervalUnit,
+ fromTimestamp.getEpochMillisecond(),
+ ZoneId.systemDefault().getId(),
+ toTimestamp.getMillisecond(),
+ toTimestamp.getZoneId());
+ }
+
+ public static Integer timestampdiff(
+ String timeIntervalUnit,
+ LocalZonedTimestampData fromTimestamp,
+ ZonedTimestampData toTimestamp) {
+ return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
}
- public static int timestampDiff(
- String symbol, LocalZonedTimestampData fromTimestamp,
ZonedTimestampData toTimestamp) {
+ public static Integer timestampDiff(
+ String timeIntervalUnit,
+ ZonedTimestampData fromTimestamp,
+ LocalZonedTimestampData toTimestamp) {
+ if (fromTimestamp == null || toTimestamp == null) {
+ return null;
+ }
return timestampDiff(
- symbol, fromTimestamp.getEpochMillisecond(),
toTimestamp.getMillisecond());
+ timeIntervalUnit,
+ fromTimestamp.getMillisecond(),
+ fromTimestamp.getZoneId(),
+ toTimestamp.getEpochMillisecond(),
+ ZoneId.systemDefault().getId());
+ }
+
+ public static Integer timestampdiff(
+ String timeIntervalUnit,
+ ZonedTimestampData fromTimestamp,
+ LocalZonedTimestampData toTimestamp) {
+ return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
}
- public static int timestampDiff(
- String symbol, ZonedTimestampData fromTimestamp,
LocalZonedTimestampData toTimestamp) {
+ public static Integer timestampDiff(
+ String timeIntervalUnit, TimestampData fromTimestamp,
ZonedTimestampData toTimestamp) {
+ if (fromTimestamp == null || toTimestamp == null) {
+ return null;
+ }
return timestampDiff(
- symbol, fromTimestamp.getMillisecond(),
toTimestamp.getEpochMillisecond());
+ timeIntervalUnit,
+ fromTimestamp.getMillisecond(),
+ null,
+ toTimestamp.getMillisecond(),
+ toTimestamp.getZoneId());
}
- public static int timestampDiff(
- String symbol, TimestampData fromTimestamp, ZonedTimestampData
toTimestamp) {
- return timestampDiff(symbol, fromTimestamp.getMillisecond(),
toTimestamp.getMillisecond());
+ public static Integer timestampdiff(
+ String timeIntervalUnit, TimestampData fromTimestamp,
ZonedTimestampData toTimestamp) {
+ return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
}
- public static int timestampDiff(
- String symbol, ZonedTimestampData fromTimestamp, TimestampData
toTimestamp) {
- return timestampDiff(symbol, fromTimestamp.getMillisecond(),
toTimestamp.getMillisecond());
+ public static Integer timestampDiff(
+ String timeIntervalUnit, ZonedTimestampData fromTimestamp,
TimestampData toTimestamp) {
+ if (fromTimestamp == null || toTimestamp == null) {
+ return null;
+ }
+ return timestampDiff(
+ timeIntervalUnit,
+ fromTimestamp.getMillisecond(),
+ fromTimestamp.getZoneId(),
+ toTimestamp.getMillisecond(),
+ null);
+ }
+
+ public static Integer timestampdiff(
+ String timeIntervalUnit, ZonedTimestampData fromTimestamp,
TimestampData toTimestamp) {
+ return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
}
- public static int timestampDiff(String symbol, long fromDate, long toDate)
{
- Calendar from = Calendar.getInstance();
+ public static Integer timestampDiff(
+ String timeIntervalUnit,
+ long fromDate,
+ String fromDateZoneId,
+ long toDate,
+ String toDateZoneId) {
+ if (fromDateZoneId == null || fromDateZoneId.isEmpty()) {
+ fromDateZoneId = "UTC";
+ }
+ if (toDateZoneId == null || toDateZoneId.isEmpty()) {
+ toDateZoneId = "UTC";
+ }
Review Comment:
Ditto, why introducing timezone-aware timestampDiff?
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java:
##########
@@ -108,83 +108,305 @@ public static TimestampData toTimestamp(String str,
String format, String timezo
}
}
- public static int timestampDiff(
- String symbol,
+ public static Integer timestampDiff(
+ String timeIntervalUnit,
LocalZonedTimestampData fromTimestamp,
LocalZonedTimestampData toTimestamp) {
+ if (fromTimestamp == null || toTimestamp == null) {
+ return null;
+ }
return timestampDiff(
- symbol, fromTimestamp.getEpochMillisecond(),
toTimestamp.getEpochMillisecond());
+ timeIntervalUnit,
+ fromTimestamp.getEpochMillisecond(),
+ ZoneId.systemDefault().getId(),
+ toTimestamp.getEpochMillisecond(),
+ ZoneId.systemDefault().getId());
Review Comment:
I think it's potentially risky to use `ZoneId.systemDefault` by default
since crossing Daylight Saving Time boundaries might cause unexpected results.
Why not just stick to UTC and remove the "TimeZone" parameter from all
`timestampdiff` function (they're not exposed to user anyway)?
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java:
##########
@@ -108,83 +108,305 @@ public static TimestampData toTimestamp(String str,
String format, String timezo
}
}
- public static int timestampDiff(
- String symbol,
+ public static Integer timestampDiff(
+ String timeIntervalUnit,
LocalZonedTimestampData fromTimestamp,
LocalZonedTimestampData toTimestamp) {
+ if (fromTimestamp == null || toTimestamp == null) {
+ return null;
+ }
return timestampDiff(
- symbol, fromTimestamp.getEpochMillisecond(),
toTimestamp.getEpochMillisecond());
+ timeIntervalUnit,
+ fromTimestamp.getEpochMillisecond(),
+ ZoneId.systemDefault().getId(),
+ toTimestamp.getEpochMillisecond(),
+ ZoneId.systemDefault().getId());
}
- public static int timestampDiff(
- String symbol, TimestampData fromTimestamp, TimestampData
toTimestamp) {
- return timestampDiff(symbol, fromTimestamp.getMillisecond(),
toTimestamp.getMillisecond());
+ public static Integer timestampdiff(
+ String timeIntervalUnit,
+ LocalZonedTimestampData fromTimestamp,
+ LocalZonedTimestampData toTimestamp) {
+ return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
}
- public static int timestampDiff(
- String symbol, TimestampData fromTimestamp,
LocalZonedTimestampData toTimestamp) {
+ public static Integer timestampDiff(
+ String timeIntervalUnit, TimestampData fromTimestamp,
TimestampData toTimestamp) {
+ if (fromTimestamp == null || toTimestamp == null) {
+ return null;
+ }
return timestampDiff(
- symbol, fromTimestamp.getMillisecond(),
toTimestamp.getEpochMillisecond());
+ timeIntervalUnit,
+ fromTimestamp.getMillisecond(),
+ null,
+ toTimestamp.getMillisecond(),
+ null);
+ }
+
+ public static Integer timestampdiff(
+ String timeIntervalUnit, TimestampData fromTimestamp,
TimestampData toTimestamp) {
+ return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
}
- public static int timestampDiff(
- String symbol, LocalZonedTimestampData fromTimestamp,
TimestampData toTimestamp) {
+ public static Integer timestampDiff(
+ String timeIntervalUnit,
+ TimestampData fromTimestamp,
+ LocalZonedTimestampData toTimestamp) {
+ if (fromTimestamp == null || toTimestamp == null) {
+ return null;
+ }
return timestampDiff(
- symbol, fromTimestamp.getEpochMillisecond(),
toTimestamp.getMillisecond());
+ timeIntervalUnit,
+ fromTimestamp.getMillisecond(),
+ null,
+ toTimestamp.getEpochMillisecond(),
+ ZoneId.systemDefault().getId());
}
- public static int timestampDiff(
- String symbol, ZonedTimestampData fromTimestamp,
ZonedTimestampData toTimestamp) {
- return timestampDiff(symbol, fromTimestamp.getMillisecond(),
toTimestamp.getMillisecond());
+ public static Integer timestampdiff(
+ String timeIntervalUnit,
+ TimestampData fromTimestamp,
+ LocalZonedTimestampData toTimestamp) {
+ return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
+ }
+
+ public static Integer timestampDiff(
+ String timeIntervalUnit,
+ LocalZonedTimestampData fromTimestamp,
+ TimestampData toTimestamp) {
+ if (fromTimestamp == null || toTimestamp == null) {
+ return null;
+ }
+ return timestampDiff(
+ timeIntervalUnit,
+ fromTimestamp.getEpochMillisecond(),
+ ZoneId.systemDefault().getId(),
+ toTimestamp.getMillisecond(),
+ null);
+ }
+
+ public static Integer timestampdiff(
+ String timeIntervalUnit,
+ LocalZonedTimestampData fromTimestamp,
+ TimestampData toTimestamp) {
+ return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
+ }
+
+ public static Integer timestampDiff(
+ String timeIntervalUnit,
+ ZonedTimestampData fromTimestamp,
+ ZonedTimestampData toTimestamp) {
+ if (fromTimestamp == null || toTimestamp == null) {
+ return null;
+ }
+ return timestampDiff(
+ timeIntervalUnit,
+ fromTimestamp.getMillisecond(),
+ fromTimestamp.getZoneId(),
+ toTimestamp.getMillisecond(),
+ toTimestamp.getZoneId());
+ }
+
+ public static Integer timestampdiff(
+ String timeIntervalUnit,
+ ZonedTimestampData fromTimestamp,
+ ZonedTimestampData toTimestamp) {
+ return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
+ }
+
+ public static Integer timestampDiff(
+ String timeIntervalUnit,
+ LocalZonedTimestampData fromTimestamp,
+ ZonedTimestampData toTimestamp) {
+ if (fromTimestamp == null || toTimestamp == null) {
+ return null;
+ }
+ return timestampDiff(
+ timeIntervalUnit,
+ fromTimestamp.getEpochMillisecond(),
+ ZoneId.systemDefault().getId(),
+ toTimestamp.getMillisecond(),
+ toTimestamp.getZoneId());
+ }
+
+ public static Integer timestampdiff(
+ String timeIntervalUnit,
+ LocalZonedTimestampData fromTimestamp,
+ ZonedTimestampData toTimestamp) {
+ return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
}
- public static int timestampDiff(
- String symbol, LocalZonedTimestampData fromTimestamp,
ZonedTimestampData toTimestamp) {
+ public static Integer timestampDiff(
+ String timeIntervalUnit,
+ ZonedTimestampData fromTimestamp,
+ LocalZonedTimestampData toTimestamp) {
+ if (fromTimestamp == null || toTimestamp == null) {
+ return null;
+ }
return timestampDiff(
- symbol, fromTimestamp.getEpochMillisecond(),
toTimestamp.getMillisecond());
+ timeIntervalUnit,
+ fromTimestamp.getMillisecond(),
+ fromTimestamp.getZoneId(),
+ toTimestamp.getEpochMillisecond(),
+ ZoneId.systemDefault().getId());
+ }
+
+ public static Integer timestampdiff(
+ String timeIntervalUnit,
+ ZonedTimestampData fromTimestamp,
+ LocalZonedTimestampData toTimestamp) {
+ return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
}
- public static int timestampDiff(
- String symbol, ZonedTimestampData fromTimestamp,
LocalZonedTimestampData toTimestamp) {
+ public static Integer timestampDiff(
+ String timeIntervalUnit, TimestampData fromTimestamp,
ZonedTimestampData toTimestamp) {
+ if (fromTimestamp == null || toTimestamp == null) {
+ return null;
+ }
return timestampDiff(
- symbol, fromTimestamp.getMillisecond(),
toTimestamp.getEpochMillisecond());
+ timeIntervalUnit,
+ fromTimestamp.getMillisecond(),
+ null,
+ toTimestamp.getMillisecond(),
+ toTimestamp.getZoneId());
}
- public static int timestampDiff(
- String symbol, TimestampData fromTimestamp, ZonedTimestampData
toTimestamp) {
- return timestampDiff(symbol, fromTimestamp.getMillisecond(),
toTimestamp.getMillisecond());
+ public static Integer timestampdiff(
+ String timeIntervalUnit, TimestampData fromTimestamp,
ZonedTimestampData toTimestamp) {
+ return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
}
- public static int timestampDiff(
- String symbol, ZonedTimestampData fromTimestamp, TimestampData
toTimestamp) {
- return timestampDiff(symbol, fromTimestamp.getMillisecond(),
toTimestamp.getMillisecond());
+ public static Integer timestampDiff(
+ String timeIntervalUnit, ZonedTimestampData fromTimestamp,
TimestampData toTimestamp) {
+ if (fromTimestamp == null || toTimestamp == null) {
+ return null;
+ }
+ return timestampDiff(
+ timeIntervalUnit,
+ fromTimestamp.getMillisecond(),
+ fromTimestamp.getZoneId(),
+ toTimestamp.getMillisecond(),
+ null);
+ }
+
+ public static Integer timestampdiff(
+ String timeIntervalUnit, ZonedTimestampData fromTimestamp,
TimestampData toTimestamp) {
+ return timestampDiff(timeIntervalUnit, fromTimestamp, toTimestamp);
}
- public static int timestampDiff(String symbol, long fromDate, long toDate)
{
- Calendar from = Calendar.getInstance();
+ public static Integer timestampDiff(
+ String timeIntervalUnit,
+ long fromDate,
+ String fromDateZoneId,
+ long toDate,
+ String toDateZoneId) {
+ if (fromDateZoneId == null || fromDateZoneId.isEmpty()) {
+ fromDateZoneId = "UTC";
+ }
+ if (toDateZoneId == null || toDateZoneId.isEmpty()) {
+ toDateZoneId = "UTC";
+ }
+ Calendar from =
Calendar.getInstance(TimeZone.getTimeZone(fromDateZoneId));
from.setTime(new Date(fromDate));
- Calendar to = Calendar.getInstance();
+ Calendar to = Calendar.getInstance(TimeZone.getTimeZone(toDateZoneId));
to.setTime(new Date(toDate));
- Long second = (to.getTimeInMillis() - from.getTimeInMillis()) / 1000;
- switch (symbol) {
+ long second = (to.getTimeInMillis() - from.getTimeInMillis()) / 1000;
+ switch (timeIntervalUnit) {
case "SECOND":
- return second.intValue();
+ if (second > Integer.MAX_VALUE) {
+ return null;
+ }
+ return (int) second;
case "MINUTE":
- return second.intValue() / 60;
+ if (second > Integer.MAX_VALUE) {
+ return null;
+ }
+ return (int) second / 60;
case "HOUR":
- return second.intValue() / 3600;
+ if (second > Integer.MAX_VALUE) {
+ return null;
+ }
+ return (int) second / 3600;
case "DAY":
- return second.intValue() / (24 * 3600);
+ if (second > Integer.MAX_VALUE) {
+ return null;
+ }
+ return (int) second / (24 * 3600);
case "MONTH":
return to.get(Calendar.YEAR) * 12
- + to.get(Calendar.MONDAY)
- - (from.get(Calendar.YEAR) * 12 +
from.get(Calendar.MONDAY));
+ + to.get(Calendar.MONTH)
+ - (from.get(Calendar.YEAR) * 12 +
from.get(Calendar.MONTH));
case "YEAR":
return to.get(Calendar.YEAR) - from.get(Calendar.YEAR);
default:
- LOG.error("Unsupported timestamp diff: {}", symbol);
- throw new RuntimeException("Unsupported timestamp diff: " +
symbol);
+ throw new RuntimeException(
+ String.format(
+ "Unsupported timestamp interval unit %s.
Supported units are: SECOND, MINUTE, HOUR, DAY, MONTH, YEAR",
+ timeIntervalUnit));
+ }
+ }
+
+ public static LocalZonedTimestampData timestampadd(
+ String timeIntervalUnit, Integer interval, LocalZonedTimestampData
timePoint) {
+ if (interval == null || timePoint == null) {
+ return null;
+ }
+ return LocalZonedTimestampData.fromEpochMillis(
+ timestampadd(timeIntervalUnit, interval,
timePoint.getEpochMillisecond()));
+ }
+
+ public static ZonedTimestampData timestampadd(
+ String timeIntervalUnit, Integer interval, ZonedTimestampData
timePoint) {
+ if (interval == null || timePoint == null) {
+ return null;
+ }
+ return ZonedTimestampData.of(
+ timestampadd(timeIntervalUnit, interval,
timePoint.getMillisecond()),
+ 0,
+ timePoint.getZoneId());
+ }
+
+ public static TimestampData timestampadd(
+ String timeIntervalUnit, Integer interval, TimestampData
timePoint) {
+ if (interval == null || timePoint == null) {
+ return null;
+ }
+ return TimestampData.fromMillis(
+ timestampadd(timeIntervalUnit, interval,
timePoint.getMillisecond()));
+ }
+
+ private static long timestampadd(String timeIntervalUnit, int interval,
long timePoint) {
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTimeZone(TimeZone.getTimeZone("UTC"));
+ calendar.setTime(new Date(timePoint));
Review Comment:
Can we cache a static `Calendar` and avoid creating one every time when this
method got invoked?
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlReturnTypes.java:
##########
@@ -188,4 +188,12 @@ public RelDataType inferReturnType(SqlOperatorBinding
opBinding) {
return opBinding.getTypeFactory().leastRestrictive(types);
}
}
+
+ public static final SqlReturnTypeInference ARG2_TIMESTAMP_FORCE_NULLABLE =
+ new OrdinalReturnTypeInference(2) {
+ @Override
+ public RelDataType inferReturnType(SqlOperatorBinding
opBinding) {
+ return super.inferReturnType(opBinding);
+ }
Review Comment:
Is this override necessary?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]