leonardBang commented on a change in pull request #15303:
URL: https://github.com/apache/flink/pull/15303#discussion_r601620460



##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
##########
@@ -461,137 +462,44 @@ class CodeGeneratorContext(val tableConfig: TableConfig) 
{
   }
 
   /**
-   * Adds a reusable session-level timestamp to the beginning of the SAM of 
the generated class.
+   * Adds a reusable query-level timestamp to the beginning of the SAM of the 
generated class.
    *
-   * <p> The timestamp value is evaluated once for one session, this
-   * function is generally used for batch job.
+   * <p> The timestamp value is evaluated once at query-start, this
+   * function is generally used in batch job.
    */
-  def addReusableSessionTimestamp(): String = {
-    val fieldTerm = s"sessionTimestamp"
+  def addReusableQueryLevelCurrentTimestamp(): String = {
+    val fieldTerm = s"queryStartTimestamp"
 
     val queryStartEpoch = tableConfig.getConfiguration
-      .getLong("__table.query-start.epoch-time__", 0)
-
-    val queryStartEpochStr = if (queryStartEpoch != 0) {
-      s"${queryStartEpoch}L"
-    } else {
-      throw new CodeGenException("Try  to obtain current session timestamp 
fail. " +
-        "This is a bug, please file an issue.")
-    }
-
-    reusableMemberStatements.add(s"""
-    |private static final $TIMESTAMP_DATA $fieldTerm =
-    |$TIMESTAMP_DATA.fromEpochMillis($queryStartEpochStr);
-    |""".stripMargin)
-    fieldTerm
-  }
-
-  /**
-   * Adds a reusable row-level time to the beginning of the SAM of
-   * the generated [[Function]].
-   */
-  def addReusableTime(): String = {
-    val fieldTerm = s"time"
-
-    val timestamp = addReusableTimestamp()
-
-    // declaration
-    reusableMemberStatements.add(s"private int $fieldTerm;")
-
-    // assignment
-    // adopted from org.apache.calcite.runtime.SqlFunctions.currentTime()
-    val field =
-      s"""
-         |$fieldTerm = (int) ($timestamp.getMillisecond() % 
${DateTimeUtils.MILLIS_PER_DAY});
-         |if (time < 0) {
-         |  time += ${DateTimeUtils.MILLIS_PER_DAY};
-         |}
-         |""".stripMargin
-    reusablePerRecordStatements.add(field)
-    fieldTerm
-  }
-
-  /**
-   * Adds a reusable session-level time to the beginning of the SAM of
-   * the generated [[Function]].
-   */
-  def addReusableSessionTime(): String = {
-    val fieldTerm = s"sessionTime"
-
-    val sessionTimestamp = addReusableSessionTimestamp()
-    reusableMemberStatements.add(
-      s"""
-      |private static final int $fieldTerm = (int)
-      | ($sessionTimestamp.getMillisecond() % ${DateTimeUtils.MILLIS_PER_DAY}) 
< 0 ?
-      | (int) ($sessionTimestamp.getMillisecond() %
-      | ${DateTimeUtils.MILLIS_PER_DAY}) + ${DateTimeUtils.MILLIS_PER_DAY} :
-      | (int) ($sessionTimestamp.getMillisecond() % 
${DateTimeUtils.MILLIS_PER_DAY});
-      |""".stripMargin)
-    fieldTerm
-  }
-
-  /**
-   * Adds a reusable row-level current time to the beginning of the SAM of
-   * the generated [[Function]].
-   */
-  def addReusableCurrentTime(): String = {
-    val fieldTerm = s"currentTime"
-
-    val timestamp = addReusableTimestamp()
-    val sessionTimeZone = addReusableSessionTimeZone()
-
-    // declaration
-    reusableMemberStatements.add(s"private int $fieldTerm;")
-
-    // assignment
-    // adopted from org.apache.calcite.runtime.SqlFunctions.currentTime()
-    val field =
-    s"""
-       |$fieldTerm = (int) (($timestamp.getMillisecond()
-       |  + $sessionTimeZone.getOffset($timestamp.getMillisecond()))
-       |  % ${DateTimeUtils.MILLIS_PER_DAY});
-       |if ($fieldTerm < 0) {
-       |  $fieldTerm += ${DateTimeUtils.MILLIS_PER_DAY};
-       |}
-       |""".stripMargin
-    reusablePerRecordStatements.add(field)
-    fieldTerm
-  }
-
-  /**
-   * Adds a reusable session-level current time to the beginning of the SAM of
-   * the generated [[Function]].
-   */
-  def addReusableSessionCurrentTime(): String = {
-    val fieldTerm = s"sessionCurrentTime"
-
-    val sessionTimestamp = addReusableSessionTimestamp()
-    val sessionTimeZone = addReusableSessionTimeZone()
+      .getOptional(InternalConfigOptions.TABLE_QUERY_START_EPOCH_TIME)
+      .orElseThrow(
+        new JSupplier[Throwable] {
+          override def get() = new CodeGenException(
+            "Try to obtain epoch time of query-start fail." +
+              " This is a bug, please file an issue.")
+        }
+      )
 
     reusableMemberStatements.add(
       s"""
-          |private static final int $fieldTerm =
-          | ((int) (($sessionTimestamp.getMillisecond()
-          |  + $sessionTimeZone.getOffset($sessionTimestamp.getMillisecond()))
-          |  % ${DateTimeUtils.MILLIS_PER_DAY})) < 0 ?
-          | ((int) (($sessionTimestamp.getMillisecond()
-          |  + $sessionTimeZone.getOffset($sessionTimestamp.getMillisecond()))
-          |  % ${DateTimeUtils.MILLIS_PER_DAY}) + 
${DateTimeUtils.MILLIS_PER_DAY}) :
-          |  (int) (($sessionTimestamp.getMillisecond()
-          |  + $sessionTimeZone.getOffset($sessionTimestamp.getMillisecond()))
-          |  % ${DateTimeUtils.MILLIS_PER_DAY});
+          |private static final $TIMESTAMP_DATA $fieldTerm =
+          |$TIMESTAMP_DATA.fromEpochMillis(${queryStartEpoch}L);
           |""".stripMargin)
     fieldTerm
   }
 
   /**
-    * Adds a reusable local date time to the beginning of the SAM of the 
generated class.
-    */
-  def addReusableLocalDateTime(): String = {
-    val fieldTerm = s"localtimestamp"
+   * Adds a reusable record-level local date time to the beginning of the
+   * SAM of the generated class.
+   *
+   * <p> The timestamp value is evaluated for per record, this
+   * function is generally used in stream job.
+   */
+  def addReusableRecordLevelLocalDateTime(): String = {
+    val fieldTerm = s"localTimestamp"
 
     val sessionTimeZone = addReusableSessionTimeZone()
-    val timestamp = addReusableTimestamp()
+    val timestamp = addReusableRecordLevelCurrentTimestamp()

Review comment:
       This should be right




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to