leonardBang commented on a change in pull request #15303:
URL: https://github.com/apache/flink/pull/15303#discussion_r601035897



##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
##########
@@ -20,52 +20,164 @@ package org.apache.flink.table.planner.expressions
 
 import java.sql.Time
 import java.time.format.DateTimeFormatter
-import java.time.{LocalDateTime, ZoneId}
+import java.time.{LocalDate, LocalDateTime, ZoneId}
 
+import org.apache.flink.api.common.RuntimeExecutionMode
 import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.ExecutionOptions
 import org.apache.flink.table.api._
 import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.table.planner.expressions.utils.ExpressionTestBase
 import org.apache.flink.types.Row
 
+import org.junit.Assert.assertEquals
 import org.junit.Test
 
+import scala.collection.mutable
+
 /**
   * Tests that check all non-deterministic functions can be executed.
   */
 class NonDeterministicTests extends ExpressionTestBase {
 
   @Test
-  def testCurrentDate(): Unit = {
+  def testCurrentDateTime(): Unit = {
     testAllApis(
       currentDate().isGreater("1970-01-01".toDate),
-      "currentDate() > '1970-01-01'.toDate",
       "CURRENT_DATE > DATE '1970-01-01'",
       "true")
-  }
 
-  @Test
-  def testCurrentTime(): Unit = {
     testAllApis(
       currentTime().isGreaterOrEqual("00:00:00".toTime),
-      "currentTime() >= '00:00:00'.toTime",
       "CURRENT_TIME >= TIME '00:00:00'",
       "true")
+
+    testAllApis(
+      currentTimestamp().isGreater(
+        "1970-01-01 
00:00:00".cast(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())),
+      s"CURRENT_TIMESTAMP > ${timestampLtz("1970-01-01 00:00:00")}",
+      "true")
+
+    testSqlApi(s"NOW() > ${timestampLtz("1970-01-01 00:00:00")}",
+      "true")
   }
 
   @Test
-  def testCurrentTimestamp(): Unit = {
-    testAllApis(
-      currentTimestamp().isGreater("1970-01-01 00:00:00".toTimestamp),
-      "currentTimestamp() > '1970-01-01 00:00:00'.toTimestamp",
-      "CURRENT_TIMESTAMP > TIMESTAMP '1970-01-01 00:00:00'",
+  def testCurrentDateTimeInStreamMode(): Unit = {
+    val temporalFunctions = getCodeGenFunctions(List(
+      "CURRENT_DATE",
+      "CURRENT_TIME",
+      "CURRENT_TIMESTAMP",
+      "NOW()",
+      "LOCALTIME",
+      "LOCALTIMESTAMP"))
+    val round1 = evaluateFunctionResult(temporalFunctions)
+    Thread.sleep(1 * 1000L)
+    val round2: List[String] = evaluateFunctionResult(temporalFunctions)
+
+    assertEquals(round1.size, round2.size)
+    round1.zip(round2).zipWithIndex.foreach(r => {
+      // CURRENT_DATE may be same between two records
+      if (r._2 == 0) {
+        assert(r._1._1 <= r._1._2)
+      } else {
+        assert(r._1._1 < r._1._2)
+      }
+    })
+  }
+
+  @Test
+  def testCurrentDateTimeInBatchMode(): Unit = {
+    config.getConfiguration.set(ExecutionOptions.RUNTIME_MODE, 
RuntimeExecutionMode.BATCH)
+    config.getConfiguration.setLong("__table.query-start.epoch-time__", 1000L)
+    config.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
+    val temporalFunctions = getCodeGenFunctions(List(
+      "CURRENT_DATE",
+      "CURRENT_TIME",
+      "CURRENT_TIMESTAMP",
+      "NOW()",
+      "LOCALTIME",
+      "LOCALTIMESTAMP"))
+
+    val expected = mutable.MutableList[String](
+      "1970-01-01",
+      "00:00:01",
+      "1970-01-01 08:00:01",
+      "1970-01-01 08:00:01",
+      "08:00:01",
+      "1970-01-01 08:00:01")
+
+    val result1 = evaluateFunctionResult(temporalFunctions)
+    assertEquals(expected.toList.sorted, result1.sorted)
+
+    Thread.sleep(1 * 1000L)
+    val result2: List[String] = evaluateFunctionResult(temporalFunctions)
+    assertEquals(expected.toList.sorted, result2.sorted)

Review comment:
       we need to check the batch mode always return same value in one query 
for different record




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to