wuchong commented on a change in pull request #15303:
URL: https://github.com/apache/flink/pull/15303#discussion_r601563830
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
##########
@@ -461,137 +462,44 @@ class CodeGeneratorContext(val tableConfig: TableConfig)
{
}
/**
- * Adds a reusable session-level timestamp to the beginning of the SAM of
the generated class.
+ * Adds a reusable query-level timestamp to the beginning of the SAM of the
generated class.
*
- * <p> The timestamp value is evaluated once for one session, this
- * function is generally used for batch job.
+ * <p> The timestamp value is evaluated once at query-start, this
+ * function is generally used in batch job.
*/
- def addReusableSessionTimestamp(): String = {
- val fieldTerm = s"sessionTimestamp"
+ def addReusableQueryLevelCurrentTimestamp(): String = {
+ val fieldTerm = s"queryStartTimestamp"
val queryStartEpoch = tableConfig.getConfiguration
- .getLong("__table.query-start.epoch-time__", 0)
-
- val queryStartEpochStr = if (queryStartEpoch != 0) {
- s"${queryStartEpoch}L"
- } else {
- throw new CodeGenException("Try to obtain current session timestamp
fail. " +
- "This is a bug, please file an issue.")
- }
-
- reusableMemberStatements.add(s"""
- |private static final $TIMESTAMP_DATA $fieldTerm =
- |$TIMESTAMP_DATA.fromEpochMillis($queryStartEpochStr);
- |""".stripMargin)
- fieldTerm
- }
-
- /**
- * Adds a reusable row-level time to the beginning of the SAM of
- * the generated [[Function]].
- */
- def addReusableTime(): String = {
- val fieldTerm = s"time"
-
- val timestamp = addReusableTimestamp()
-
- // declaration
- reusableMemberStatements.add(s"private int $fieldTerm;")
-
- // assignment
- // adopted from org.apache.calcite.runtime.SqlFunctions.currentTime()
- val field =
- s"""
- |$fieldTerm = (int) ($timestamp.getMillisecond() %
${DateTimeUtils.MILLIS_PER_DAY});
- |if (time < 0) {
- | time += ${DateTimeUtils.MILLIS_PER_DAY};
- |}
- |""".stripMargin
- reusablePerRecordStatements.add(field)
- fieldTerm
- }
-
- /**
- * Adds a reusable session-level time to the beginning of the SAM of
- * the generated [[Function]].
- */
- def addReusableSessionTime(): String = {
- val fieldTerm = s"sessionTime"
-
- val sessionTimestamp = addReusableSessionTimestamp()
- reusableMemberStatements.add(
- s"""
- |private static final int $fieldTerm = (int)
- | ($sessionTimestamp.getMillisecond() % ${DateTimeUtils.MILLIS_PER_DAY})
< 0 ?
- | (int) ($sessionTimestamp.getMillisecond() %
- | ${DateTimeUtils.MILLIS_PER_DAY}) + ${DateTimeUtils.MILLIS_PER_DAY} :
- | (int) ($sessionTimestamp.getMillisecond() %
${DateTimeUtils.MILLIS_PER_DAY});
- |""".stripMargin)
- fieldTerm
- }
-
- /**
- * Adds a reusable row-level current time to the beginning of the SAM of
- * the generated [[Function]].
- */
- def addReusableCurrentTime(): String = {
- val fieldTerm = s"currentTime"
-
- val timestamp = addReusableTimestamp()
- val sessionTimeZone = addReusableSessionTimeZone()
-
- // declaration
- reusableMemberStatements.add(s"private int $fieldTerm;")
-
- // assignment
- // adopted from org.apache.calcite.runtime.SqlFunctions.currentTime()
- val field =
- s"""
- |$fieldTerm = (int) (($timestamp.getMillisecond()
- | + $sessionTimeZone.getOffset($timestamp.getMillisecond()))
- | % ${DateTimeUtils.MILLIS_PER_DAY});
- |if ($fieldTerm < 0) {
- | $fieldTerm += ${DateTimeUtils.MILLIS_PER_DAY};
- |}
- |""".stripMargin
- reusablePerRecordStatements.add(field)
- fieldTerm
- }
-
- /**
- * Adds a reusable session-level current time to the beginning of the SAM of
- * the generated [[Function]].
- */
- def addReusableSessionCurrentTime(): String = {
- val fieldTerm = s"sessionCurrentTime"
-
- val sessionTimestamp = addReusableSessionTimestamp()
- val sessionTimeZone = addReusableSessionTimeZone()
+ .getOptional(InternalConfigOptions.TABLE_QUERY_START_EPOCH_TIME)
+ .orElseThrow(
+ new JSupplier[Throwable] {
+ override def get() = new CodeGenException(
+ "Try to obtain epoch time of query-start fail." +
+ " This is a bug, please file an issue.")
+ }
+ )
reusableMemberStatements.add(
s"""
- |private static final int $fieldTerm =
- | ((int) (($sessionTimestamp.getMillisecond()
- | + $sessionTimeZone.getOffset($sessionTimestamp.getMillisecond()))
- | % ${DateTimeUtils.MILLIS_PER_DAY})) < 0 ?
- | ((int) (($sessionTimestamp.getMillisecond()
- | + $sessionTimeZone.getOffset($sessionTimestamp.getMillisecond()))
- | % ${DateTimeUtils.MILLIS_PER_DAY}) +
${DateTimeUtils.MILLIS_PER_DAY}) :
- | (int) (($sessionTimestamp.getMillisecond()
- | + $sessionTimeZone.getOffset($sessionTimestamp.getMillisecond()))
- | % ${DateTimeUtils.MILLIS_PER_DAY});
+ |private static final $TIMESTAMP_DATA $fieldTerm =
+ |$TIMESTAMP_DATA.fromEpochMillis(${queryStartEpoch}L);
|""".stripMargin)
fieldTerm
}
/**
- * Adds a reusable local date time to the beginning of the SAM of the
generated class.
- */
- def addReusableLocalDateTime(): String = {
- val fieldTerm = s"localtimestamp"
+ * Adds a reusable record-level local date time to the beginning of the
+ * SAM of the generated class.
+ *
+ * <p> The timestamp value is evaluated for per record, this
+ * function is generally used in stream job.
+ */
+ def addReusableRecordLevelLocalDateTime(): String = {
+ val fieldTerm = s"localTimestamp"
val sessionTimeZone = addReusableSessionTimeZone()
- val timestamp = addReusableTimestamp()
+ val timestamp = addReusableRecordLevelCurrentTimestamp()
Review comment:
use registered `local-time` instead.
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
##########
@@ -436,7 +433,9 @@ abstract class PlannerBase(
}
validateAndOverrideConfiguration()
val execGraph = ExecNodeGraph.createExecNodeGraph(jsonPlan,
createSerdeContext)
- translateToPlan(execGraph)
+ val transformations = translateToPlan(execGraph)
+ cleanupInternalConfigurations()
Review comment:
Add `cleanupInternalConfigurations` for `getJsonPlan` method.
##########
File path: docs/data/sql_functions.yml
##########
@@ -415,23 +415,23 @@ temporal:
NUMERIC.milli
NUMERIC.millis
description: Creates an interval of NUMERIC milliseconds.
- - sql: CURRENT_DATE
- table: currentDate()
- description: Returns the current SQL date in the UTC time zone.
+ - sql: LOCALTIME
+ table: localTime()
+ description: Returns the current SQL time in the local time zone. It is
evaluated for each record in streaming mode. But in batch mode, it is evaluated
once as the query starts and uses the same result for every row.
+ - sql: LOCALTIMESTAMP
+ table: localTimestamp()
+ description: Returns the current SQL timestamp in local time zone, the
return type is TIMESTAMP WITHOUT ITME ZONE. It is evaluated for each record in
streaming mode. But in batch mode, it is evaluated once as the query starts and
uses the same result for every row.
- sql: CURRENT_TIME
table: currentTime()
- description: Returns the current SQL time in the local time zone. This
function is not deterministic, it would be evaluated for each record in stream
job, it would be evaluated once in batch job at query start.
+ description: Returns the current SQL time in the local time zone, this is
a synonym of LOCAL_TIME.
+ - sql: CURRENT_DATE
+ table: currentDate()
+ description: Returns the current SQL date in the local time zone. It is
evaluated for each record in streaming mode. But in batch mode, it is evaluated
once as the query starts and uses the same result for every row.
- sql: CURRENT_TIMESTAMP
table: currentTimestamp()
- description: Returns the current SQL timestamp in the local time zone, the
return type is TIMESTAMP WITH LOCAL TIME ZONE. This function is not
deterministic, it would be evaluated for each record in stream job, it would be
evaluated once in batch job at query start.
- - sql: LOCALTIME
- table: localTime()
- description: Returns the current SQL timestamp in the local time zone,
this is a synonym of CURRENT_TIME. This function is not deterministic, it would
be evaluated for each record in stream job, it would be evaluated once in batch
job at query start.
- - sql: LOCALTIMESTAMP
- table: localTimestamp()
- description: Returns the current SQL timestamp in local time zone, the
return type is TIMESTAMP. This function is not deterministic, it would be
evaluated for each record in stream job, it would be evaluated once in batch
job at query start.
+ description: Returns the current SQL timestamp in the local time zone, the
return type is TIMESTAMP WITH LOCAL TIME ZONE. It is evaluated for each record
in streaming mode. But in batch mode, it is evaluated once as the query starts
and uses the same result for every row.
- sql: NOW()
- description: Returns the current SQL timestamp in the local time zone,
this is a synonym of CURRENT_TIME. This function is not deterministic, it would
be evaluated for each record in stream job, it would be evaluated once in batch
job at query start.
+ description: Returns the current SQL timestamp in the local time zone,
this is a synonym of CURRENT_TIMESTAMP. It is evaluated for each record in
streaming mode.
Review comment:
Remove the ending `It is evaluated for each record in streaming mode.`.
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
##########
@@ -161,65 +148,29 @@ class NonDeterministicTests extends ExpressionTestBase {
val formattedCurrentTimestamp = localDateTime
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
- // the CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP/NOW() functions are
- // not deterministic, thus we use following pattern to check the returned
- // SQL timestamp in session time zone UTC
+ // the
LOCALTIME/LOCALTIMESTAMP/CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP/NOW()
functions
+ // are not deterministic, thus we use following pattern to check the
returned SQL value
+ // in session time zone
testSqlApi(
- s"DATE_SUB(CURRENT_DATE, DATE '$formattedCurrentDate') = 0",
+ s"TIME_SUB(LOCALTIME, TIME '$formattedLocalTime') <= 60000",
"true")
-
testSqlApi(
- s"TIME_SUB(CURRENT_TIME, TIME '$formattedCurrentTime') <= 60000",
+ s"TIMESTAMPDIFF(SECOND, TIMESTAMP '$formattedLocalDateTime',
LOCALTIMESTAMP) <= 60",
"true")
-
testSqlApi(
- s"${timestampLtz(formattedCurrentTimestamp)} < CURRENT_TIMESTAMP",
+ s"DATE_SUB(CURRENT_DATE, DATE '$formattedCurrentDate') >= 0",
"true")
testSqlApi(
- s"${timestampLtz(formattedCurrentTimestamp)} < NOW()",
+ s"TIME_SUB(CURRENT_TIME, TIME '$formattedCurrentTime') <= 60000",
"true")
- }
- @Test
- def testLocalTimestampInUTC(): Unit = {
- config.setLocalTimeZone(ZoneId.of("UTC"))
- val localDateTime = LocalDateTime.now(ZoneId.of("UTC"))
-
- val formattedLocalTime = localDateTime
- .toLocalTime
- .format(DateTimeFormatter.ofPattern("HH:mm:ss"))
- val formattedLocalDateTime = localDateTime
- .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
-
- // the LOCALTIME/LOCALTIMESTAMP functions are not deterministic, thus we
- // use following pattern to check the returned SQL timestamp in session
time zone UTC
- testSqlApi(
- s"TIME_SUB(LOCALTIME, TIME '$formattedLocalTime') <= 60000",
- "true")
testSqlApi(
- s"TIMESTAMPDIFF(SECOND, TIMESTAMP '$formattedLocalDateTime',
LOCALTIMESTAMP) <= 60",
+ s"TIMESTAMPDIFF(SECOND, ${timestampLtz(formattedCurrentTimestamp)},
CURRENT_TIMESTAMP) <= 60",
"true")
- }
- @Test
- def testLocalTimestampInShanghai(): Unit = {
- config.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
- val localDateTime = LocalDateTime.now(ZoneId.of("Asia/Shanghai"))
-
- val formattedLocalTime = localDateTime
- .toLocalTime
- .format(DateTimeFormatter.ofPattern("HH:mm:ss"))
- val formattedLocalDateTime = localDateTime
- .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
-
- // the LOCALTIME/LOCALTIMESTAMP functions are not deterministic, thus we
- // use following pattern to check the returned SQL timestamp in session
time zone Shanghai
- testSqlApi(
- s"TIME_SUB(LOCALTIME, TIME '$formattedLocalTime') <= 60000",
- "true")
testSqlApi(
- s"TIMESTAMPDIFF(SECOND, TIMESTAMP '$formattedLocalDateTime',
LOCALTIMESTAMP) <= 60",
+ s"${timestampLtz(formattedCurrentTimestamp)} < NOW()",
Review comment:
Use `TIMESTAMPDIFF` .
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]