wuchong commented on a change in pull request #15303:
URL: https://github.com/apache/flink/pull/15303#discussion_r601563830



##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
##########
@@ -461,137 +462,44 @@ class CodeGeneratorContext(val tableConfig: TableConfig) 
{
   }
 
   /**
-   * Adds a reusable session-level timestamp to the beginning of the SAM of 
the generated class.
+   * Adds a reusable query-level timestamp to the beginning of the SAM of the 
generated class.
    *
-   * <p> The timestamp value is evaluated once for one session, this
-   * function is generally used for batch job.
+   * <p> The timestamp value is evaluated once at query-start, this
+   * function is generally used in batch job.
    */
-  def addReusableSessionTimestamp(): String = {
-    val fieldTerm = s"sessionTimestamp"
+  def addReusableQueryLevelCurrentTimestamp(): String = {
+    val fieldTerm = s"queryStartTimestamp"
 
     val queryStartEpoch = tableConfig.getConfiguration
-      .getLong("__table.query-start.epoch-time__", 0)
-
-    val queryStartEpochStr = if (queryStartEpoch != 0) {
-      s"${queryStartEpoch}L"
-    } else {
-      throw new CodeGenException("Try  to obtain current session timestamp 
fail. " +
-        "This is a bug, please file an issue.")
-    }
-
-    reusableMemberStatements.add(s"""
-    |private static final $TIMESTAMP_DATA $fieldTerm =
-    |$TIMESTAMP_DATA.fromEpochMillis($queryStartEpochStr);
-    |""".stripMargin)
-    fieldTerm
-  }
-
-  /**
-   * Adds a reusable row-level time to the beginning of the SAM of
-   * the generated [[Function]].
-   */
-  def addReusableTime(): String = {
-    val fieldTerm = s"time"
-
-    val timestamp = addReusableTimestamp()
-
-    // declaration
-    reusableMemberStatements.add(s"private int $fieldTerm;")
-
-    // assignment
-    // adopted from org.apache.calcite.runtime.SqlFunctions.currentTime()
-    val field =
-      s"""
-         |$fieldTerm = (int) ($timestamp.getMillisecond() % 
${DateTimeUtils.MILLIS_PER_DAY});
-         |if (time < 0) {
-         |  time += ${DateTimeUtils.MILLIS_PER_DAY};
-         |}
-         |""".stripMargin
-    reusablePerRecordStatements.add(field)
-    fieldTerm
-  }
-
-  /**
-   * Adds a reusable session-level time to the beginning of the SAM of
-   * the generated [[Function]].
-   */
-  def addReusableSessionTime(): String = {
-    val fieldTerm = s"sessionTime"
-
-    val sessionTimestamp = addReusableSessionTimestamp()
-    reusableMemberStatements.add(
-      s"""
-      |private static final int $fieldTerm = (int)
-      | ($sessionTimestamp.getMillisecond() % ${DateTimeUtils.MILLIS_PER_DAY}) 
< 0 ?
-      | (int) ($sessionTimestamp.getMillisecond() %
-      | ${DateTimeUtils.MILLIS_PER_DAY}) + ${DateTimeUtils.MILLIS_PER_DAY} :
-      | (int) ($sessionTimestamp.getMillisecond() % 
${DateTimeUtils.MILLIS_PER_DAY});
-      |""".stripMargin)
-    fieldTerm
-  }
-
-  /**
-   * Adds a reusable row-level current time to the beginning of the SAM of
-   * the generated [[Function]].
-   */
-  def addReusableCurrentTime(): String = {
-    val fieldTerm = s"currentTime"
-
-    val timestamp = addReusableTimestamp()
-    val sessionTimeZone = addReusableSessionTimeZone()
-
-    // declaration
-    reusableMemberStatements.add(s"private int $fieldTerm;")
-
-    // assignment
-    // adopted from org.apache.calcite.runtime.SqlFunctions.currentTime()
-    val field =
-    s"""
-       |$fieldTerm = (int) (($timestamp.getMillisecond()
-       |  + $sessionTimeZone.getOffset($timestamp.getMillisecond()))
-       |  % ${DateTimeUtils.MILLIS_PER_DAY});
-       |if ($fieldTerm < 0) {
-       |  $fieldTerm += ${DateTimeUtils.MILLIS_PER_DAY};
-       |}
-       |""".stripMargin
-    reusablePerRecordStatements.add(field)
-    fieldTerm
-  }
-
-  /**
-   * Adds a reusable session-level current time to the beginning of the SAM of
-   * the generated [[Function]].
-   */
-  def addReusableSessionCurrentTime(): String = {
-    val fieldTerm = s"sessionCurrentTime"
-
-    val sessionTimestamp = addReusableSessionTimestamp()
-    val sessionTimeZone = addReusableSessionTimeZone()
+      .getOptional(InternalConfigOptions.TABLE_QUERY_START_EPOCH_TIME)
+      .orElseThrow(
+        new JSupplier[Throwable] {
+          override def get() = new CodeGenException(
+            "Try to obtain epoch time of query-start fail." +
+              " This is a bug, please file an issue.")
+        }
+      )
 
     reusableMemberStatements.add(
       s"""
-          |private static final int $fieldTerm =
-          | ((int) (($sessionTimestamp.getMillisecond()
-          |  + $sessionTimeZone.getOffset($sessionTimestamp.getMillisecond()))
-          |  % ${DateTimeUtils.MILLIS_PER_DAY})) < 0 ?
-          | ((int) (($sessionTimestamp.getMillisecond()
-          |  + $sessionTimeZone.getOffset($sessionTimestamp.getMillisecond()))
-          |  % ${DateTimeUtils.MILLIS_PER_DAY}) + 
${DateTimeUtils.MILLIS_PER_DAY}) :
-          |  (int) (($sessionTimestamp.getMillisecond()
-          |  + $sessionTimeZone.getOffset($sessionTimestamp.getMillisecond()))
-          |  % ${DateTimeUtils.MILLIS_PER_DAY});
+          |private static final $TIMESTAMP_DATA $fieldTerm =
+          |$TIMESTAMP_DATA.fromEpochMillis(${queryStartEpoch}L);
           |""".stripMargin)
     fieldTerm
   }
 
   /**
-    * Adds a reusable local date time to the beginning of the SAM of the 
generated class.
-    */
-  def addReusableLocalDateTime(): String = {
-    val fieldTerm = s"localtimestamp"
+   * Adds a reusable record-level local date time to the beginning of the
+   * SAM of the generated class.
+   *
+   * <p> The timestamp value is evaluated for per record, this
+   * function is generally used in stream job.
+   */
+  def addReusableRecordLevelLocalDateTime(): String = {
+    val fieldTerm = s"localTimestamp"
 
     val sessionTimeZone = addReusableSessionTimeZone()
-    val timestamp = addReusableTimestamp()
+    val timestamp = addReusableRecordLevelCurrentTimestamp()

Review comment:
       use registered `local-time` instead. 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
##########
@@ -436,7 +433,9 @@ abstract class PlannerBase(
     }
     validateAndOverrideConfiguration()
     val execGraph = ExecNodeGraph.createExecNodeGraph(jsonPlan, 
createSerdeContext)
-    translateToPlan(execGraph)
+    val transformations = translateToPlan(execGraph)
+    cleanupInternalConfigurations()

Review comment:
       Add `cleanupInternalConfigurations` for `getJsonPlan` method.

##########
File path: docs/data/sql_functions.yml
##########
@@ -415,23 +415,23 @@ temporal:
       NUMERIC.milli
       NUMERIC.millis
     description: Creates an interval of NUMERIC milliseconds.
-  - sql: CURRENT_DATE
-    table: currentDate()
-    description: Returns the current SQL date in the UTC time zone.
+  - sql: LOCALTIME
+      table: localTime()
+      description: Returns the current SQL time in the local time zone. It is 
evaluated for each record in streaming mode. But in batch mode, it is evaluated 
once as the query starts and uses the same result for every row.
+  - sql: LOCALTIMESTAMP
+    table: localTimestamp()
+    description: Returns the current SQL timestamp in local time zone, the 
return type is TIMESTAMP WITHOUT ITME ZONE. It is evaluated for each record in 
streaming mode. But in batch mode, it is evaluated once as the query starts and 
uses the same result for every row.
   - sql: CURRENT_TIME
     table: currentTime()
-    description: Returns the current SQL time in the local time zone. This 
function is not deterministic, it would be evaluated for each record in stream 
job, it would be evaluated once in batch job at query start.
+    description: Returns the current SQL time in the local time zone, this is 
a synonym of LOCAL_TIME.
+  - sql: CURRENT_DATE
+    table: currentDate()
+    description: Returns the current SQL date in the local time zone. It is 
evaluated for each record in streaming mode. But in batch mode, it is evaluated 
once as the query starts and uses the same result for every row.
   - sql: CURRENT_TIMESTAMP
     table: currentTimestamp()
-    description: Returns the current SQL timestamp in the local time zone, the 
return type is TIMESTAMP WITH LOCAL TIME ZONE. This function is not 
deterministic, it would be evaluated for each record in stream job, it would be 
evaluated once in batch job at query start.
-  - sql: LOCALTIME
-    table: localTime()
-    description: Returns the current SQL timestamp in the local time zone, 
this is a synonym of CURRENT_TIME. This function is not deterministic, it would 
be evaluated for each record in stream job, it would be evaluated once in batch 
job at query start.
-  - sql: LOCALTIMESTAMP
-    table: localTimestamp()
-    description: Returns the current SQL timestamp in local time zone, the 
return type is TIMESTAMP. This function is not deterministic, it would be 
evaluated for each record in stream job, it would be evaluated once in batch 
job at query start.
+    description: Returns the current SQL timestamp in the local time zone, the 
return type is TIMESTAMP WITH LOCAL TIME ZONE. It is evaluated for each record 
in streaming mode. But in batch mode, it is evaluated once as the query starts 
and uses the same result for every row.
   - sql: NOW()
-    description: Returns the current SQL timestamp in the local time zone, 
this is a synonym of CURRENT_TIME. This function is not deterministic, it would 
be evaluated for each record in stream job, it would be evaluated once in batch 
job at query start.
+    description: Returns the current SQL timestamp in the local time zone, 
this is a synonym of CURRENT_TIMESTAMP. It is evaluated for each record in 
streaming mode.

Review comment:
       Remove the ending `It is evaluated for each record in streaming mode.`.

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
##########
@@ -161,65 +148,29 @@ class NonDeterministicTests extends ExpressionTestBase {
     val formattedCurrentTimestamp = localDateTime
       .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
 
-    // the CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP/NOW() functions are
-    // not deterministic, thus we use following pattern to check the returned
-    // SQL timestamp in session time zone UTC
+    // the 
LOCALTIME/LOCALTIMESTAMP/CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP/NOW() 
functions
+    // are not deterministic, thus we use following pattern to check the 
returned SQL value
+    // in session time zone
     testSqlApi(
-      s"DATE_SUB(CURRENT_DATE, DATE '$formattedCurrentDate') = 0",
+      s"TIME_SUB(LOCALTIME, TIME '$formattedLocalTime') <= 60000",
       "true")
-
     testSqlApi(
-      s"TIME_SUB(CURRENT_TIME, TIME '$formattedCurrentTime') <= 60000",
+      s"TIMESTAMPDIFF(SECOND, TIMESTAMP '$formattedLocalDateTime', 
LOCALTIMESTAMP) <= 60",
       "true")
-
     testSqlApi(
-      s"${timestampLtz(formattedCurrentTimestamp)} < CURRENT_TIMESTAMP",
+      s"DATE_SUB(CURRENT_DATE, DATE '$formattedCurrentDate') >= 0",
       "true")
 
     testSqlApi(
-      s"${timestampLtz(formattedCurrentTimestamp)} < NOW()",
+      s"TIME_SUB(CURRENT_TIME, TIME '$formattedCurrentTime') <= 60000",
       "true")
-  }
 
-  @Test
-  def testLocalTimestampInUTC(): Unit = {
-    config.setLocalTimeZone(ZoneId.of("UTC"))
-    val localDateTime = LocalDateTime.now(ZoneId.of("UTC"))
-
-    val formattedLocalTime = localDateTime
-      .toLocalTime
-      .format(DateTimeFormatter.ofPattern("HH:mm:ss"))
-    val formattedLocalDateTime = localDateTime
-      .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
-
-    // the LOCALTIME/LOCALTIMESTAMP functions are not deterministic, thus we
-    // use following pattern to check the returned SQL timestamp in session 
time zone UTC
-    testSqlApi(
-      s"TIME_SUB(LOCALTIME, TIME '$formattedLocalTime') <= 60000",
-      "true")
     testSqlApi(
-      s"TIMESTAMPDIFF(SECOND, TIMESTAMP '$formattedLocalDateTime', 
LOCALTIMESTAMP) <= 60",
+      s"TIMESTAMPDIFF(SECOND, ${timestampLtz(formattedCurrentTimestamp)}, 
CURRENT_TIMESTAMP) <= 60",
       "true")
-  }
 
-  @Test
-  def testLocalTimestampInShanghai(): Unit = {
-    config.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
-    val localDateTime = LocalDateTime.now(ZoneId.of("Asia/Shanghai"))
-
-    val formattedLocalTime = localDateTime
-      .toLocalTime
-      .format(DateTimeFormatter.ofPattern("HH:mm:ss"))
-    val formattedLocalDateTime = localDateTime
-      .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
-
-    // the LOCALTIME/LOCALTIMESTAMP functions are not deterministic, thus we
-    // use following pattern to check the returned SQL timestamp in session 
time zone Shanghai
-    testSqlApi(
-      s"TIME_SUB(LOCALTIME, TIME '$formattedLocalTime') <= 60000",
-      "true")
     testSqlApi(
-      s"TIMESTAMPDIFF(SECOND, TIMESTAMP '$formattedLocalDateTime', 
LOCALTIMESTAMP) <= 60",
+      s"${timestampLtz(formattedCurrentTimestamp)} < NOW()",

Review comment:
       Use `TIMESTAMPDIFF` .




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to