leonardBang commented on a change in pull request #15303:
URL: https://github.com/apache/flink/pull/15303#discussion_r601035897
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
##########
@@ -20,52 +20,164 @@ package org.apache.flink.table.planner.expressions
import java.sql.Time
import java.time.format.DateTimeFormatter
-import java.time.{LocalDateTime, ZoneId}
+import java.time.{LocalDate, LocalDateTime, ZoneId}
+import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.ExecutionOptions
import org.apache.flink.table.api._
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.planner.expressions.utils.ExpressionTestBase
import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
import org.junit.Test
+import scala.collection.mutable
+
/**
* Tests that check all non-deterministic functions can be executed.
*/
class NonDeterministicTests extends ExpressionTestBase {
@Test
- def testCurrentDate(): Unit = {
+ def testCurrentDateTime(): Unit = {
testAllApis(
currentDate().isGreater("1970-01-01".toDate),
- "currentDate() > '1970-01-01'.toDate",
"CURRENT_DATE > DATE '1970-01-01'",
"true")
- }
- @Test
- def testCurrentTime(): Unit = {
testAllApis(
currentTime().isGreaterOrEqual("00:00:00".toTime),
- "currentTime() >= '00:00:00'.toTime",
"CURRENT_TIME >= TIME '00:00:00'",
"true")
+
+ testAllApis(
+ currentTimestamp().isGreater(
+ "1970-01-01
00:00:00".cast(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())),
+ s"CURRENT_TIMESTAMP > ${timestampLtz("1970-01-01 00:00:00")}",
+ "true")
+
+ testSqlApi(s"NOW() > ${timestampLtz("1970-01-01 00:00:00")}",
+ "true")
}
@Test
- def testCurrentTimestamp(): Unit = {
- testAllApis(
- currentTimestamp().isGreater("1970-01-01 00:00:00".toTimestamp),
- "currentTimestamp() > '1970-01-01 00:00:00'.toTimestamp",
- "CURRENT_TIMESTAMP > TIMESTAMP '1970-01-01 00:00:00'",
+ def testCurrentDateTimeInStreamMode(): Unit = {
+ val temporalFunctions = getCodeGenFunctions(List(
+ "CURRENT_DATE",
+ "CURRENT_TIME",
+ "CURRENT_TIMESTAMP",
+ "NOW()",
+ "LOCALTIME",
+ "LOCALTIMESTAMP"))
+ val round1 = evaluateFunctionResult(temporalFunctions)
+ Thread.sleep(1 * 1000L)
+ val round2: List[String] = evaluateFunctionResult(temporalFunctions)
+
+ assertEquals(round1.size, round2.size)
+ round1.zip(round2).zipWithIndex.foreach(r => {
+ // CURRENT_DATE may be same between two records
+ if (r._2 == 0) {
+ assert(r._1._1 <= r._1._2)
+ } else {
+ assert(r._1._1 < r._1._2)
+ }
+ })
+ }
+
+ @Test
+ def testCurrentDateTimeInBatchMode(): Unit = {
+ config.getConfiguration.set(ExecutionOptions.RUNTIME_MODE,
RuntimeExecutionMode.BATCH)
+ config.getConfiguration.setLong("__table.query-start.epoch-time__", 1000L)
+ config.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
+ val temporalFunctions = getCodeGenFunctions(List(
+ "CURRENT_DATE",
+ "CURRENT_TIME",
+ "CURRENT_TIMESTAMP",
+ "NOW()",
+ "LOCALTIME",
+ "LOCALTIMESTAMP"))
+
+ val expected = mutable.MutableList[String](
+ "1970-01-01",
+ "00:00:01",
+ "1970-01-01 08:00:01",
+ "1970-01-01 08:00:01",
+ "08:00:01",
+ "1970-01-01 08:00:01")
+
+ val result1 = evaluateFunctionResult(temporalFunctions)
+ assertEquals(expected.toList.sorted, result1.sorted)
+
+ Thread.sleep(1 * 1000L)
+ val result2: List[String] = evaluateFunctionResult(temporalFunctions)
+ assertEquals(expected.toList.sorted, result2.sorted)
Review comment:
we need to check the batch mode always return same value in one query
for different record
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]