wuchong commented on a change in pull request #15303:
URL: https://github.com/apache/flink/pull/15303#discussion_r600215528
##########
File path: docs/data/sql_functions.yml
##########
@@ -420,13 +420,18 @@ temporal:
description: Returns the current SQL date in the UTC time zone.
- sql: CURRENT_TIME
table: currentTime()
- description: Returns the current SQL time in the UTC time zone.
+ description: Returns the current SQL time in the local time zone. This
function is not deterministic, it would be evaluated for each record in stream
job, it would be evaluated once in batch job at query start.
+ - sql: CURRENT_TIMESTAMP
+ table: currentTimestamp()
+ description: Returns the current SQL timestamp in the local time zone, the
return type is TIMESTAMP WITH LOCAL TIME ZONE. This function is not
deterministic, it would be evaluated for each record in stream job, it would be
evaluated once in batch job at query start.
- sql: LOCALTIME
table: localTime()
- description: Returns the current SQL timestamp in the UTC time zone.
+ description: Returns the current SQL timestamp in the local time zone,
this is a synonym of CURRENT_TIME. This function is not deterministic, it would
be evaluated for each record in stream job, it would be evaluated once in batch
job at query start.
Review comment:
Returns the current SQL **time** in the local time zone?
##########
File path: docs/data/sql_functions.yml
##########
@@ -420,13 +420,18 @@ temporal:
description: Returns the current SQL date in the UTC time zone.
- sql: CURRENT_TIME
table: currentTime()
- description: Returns the current SQL time in the UTC time zone.
+ description: Returns the current SQL time in the local time zone. This
function is not deterministic, it would be evaluated for each record in stream
job, it would be evaluated once in batch job at query start.
+ - sql: CURRENT_TIMESTAMP
+ table: currentTimestamp()
+ description: Returns the current SQL timestamp in the local time zone, the
return type is TIMESTAMP WITH LOCAL TIME ZONE. This function is not
deterministic, it would be evaluated for each record in stream job, it would be
evaluated once in batch job at query start.
- sql: LOCALTIME
table: localTime()
- description: Returns the current SQL timestamp in the UTC time zone.
+ description: Returns the current SQL timestamp in the local time zone,
this is a synonym of CURRENT_TIME. This function is not deterministic, it would
be evaluated for each record in stream job, it would be evaluated once in batch
job at query start.
- sql: LOCALTIMESTAMP
table: localTimestamp()
- description: Returns the current SQL timestamp in local time zone.
+ description: Returns the current SQL timestamp in local time zone, the
return type is TIMESTAMP. This function is not deterministic, it would be
evaluated for each record in stream job, it would be evaluated once in batch
job at query start.
+ - sql: NOW()
+ description: Returns the current SQL timestamp in the local time zone,
this is a synonym of CURRENT_TIME. This function is not deterministic, it would
be evaluated for each record in stream job, it would be evaluated once in batch
job at query start.
Review comment:
synonym of CURRENT_TIMESTAMP?
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlCurrentTimestampFunction.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.sql;
+
+import org.apache.calcite.sql.SqlSyntax;
+import org.apache.calcite.sql.fun.SqlAbstractTimeFunction;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Function that returns current timestamp, the function return type is {@link
+ * SqlTypeName#TIMESTAMP_WITH_LOCAL_TIME_ZONE}.
+ */
+public class SqlCurrentTimestampFunction extends SqlAbstractTimeFunction {
+
+ public SqlCurrentTimestampFunction(String name, SqlTypeName typeName) {
+ // access protected constructor
+ super(name, typeName);
+ }
+
+ @Override
+ public SqlSyntax getSyntax() {
+ return SqlSyntax.FUNCTION_ID;
+ }
Review comment:
Should we override `isDeterministic` to `false`?
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlCurrentTimestampFunction.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.sql;
+
+import org.apache.calcite.sql.SqlSyntax;
+import org.apache.calcite.sql.fun.SqlAbstractTimeFunction;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Function that returns current timestamp, the function return type is {@link
+ * SqlTypeName#TIMESTAMP_WITH_LOCAL_TIME_ZONE}.
Review comment:
If the return type is deterministic, could you hard code the return type
in the constructor?
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
##########
@@ -20,52 +20,164 @@ package org.apache.flink.table.planner.expressions
import java.sql.Time
import java.time.format.DateTimeFormatter
-import java.time.{LocalDateTime, ZoneId}
+import java.time.{LocalDate, LocalDateTime, ZoneId}
+import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.ExecutionOptions
import org.apache.flink.table.api._
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.planner.expressions.utils.ExpressionTestBase
import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
import org.junit.Test
+import scala.collection.mutable
+
/**
* Tests that check all non-deterministic functions can be executed.
*/
class NonDeterministicTests extends ExpressionTestBase {
@Test
- def testCurrentDate(): Unit = {
+ def testCurrentDateTime(): Unit = {
testAllApis(
currentDate().isGreater("1970-01-01".toDate),
- "currentDate() > '1970-01-01'.toDate",
"CURRENT_DATE > DATE '1970-01-01'",
"true")
- }
- @Test
- def testCurrentTime(): Unit = {
testAllApis(
currentTime().isGreaterOrEqual("00:00:00".toTime),
- "currentTime() >= '00:00:00'.toTime",
"CURRENT_TIME >= TIME '00:00:00'",
"true")
+
+ testAllApis(
+ currentTimestamp().isGreater(
+ "1970-01-01
00:00:00".cast(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())),
+ s"CURRENT_TIMESTAMP > ${timestampLtz("1970-01-01 00:00:00")}",
+ "true")
+
+ testSqlApi(s"NOW() > ${timestampLtz("1970-01-01 00:00:00")}",
+ "true")
}
@Test
- def testCurrentTimestamp(): Unit = {
- testAllApis(
- currentTimestamp().isGreater("1970-01-01 00:00:00".toTimestamp),
- "currentTimestamp() > '1970-01-01 00:00:00'.toTimestamp",
- "CURRENT_TIMESTAMP > TIMESTAMP '1970-01-01 00:00:00'",
+ def testCurrentDateTimeInStreamMode(): Unit = {
+ val temporalFunctions = getCodeGenFunctions(List(
+ "CURRENT_DATE",
+ "CURRENT_TIME",
+ "CURRENT_TIMESTAMP",
+ "NOW()",
+ "LOCALTIME",
+ "LOCALTIMESTAMP"))
+ val round1 = evaluateFunctionResult(temporalFunctions)
+ Thread.sleep(1 * 1000L)
+ val round2: List[String] = evaluateFunctionResult(temporalFunctions)
+
+ assertEquals(round1.size, round2.size)
+ round1.zip(round2).zipWithIndex.foreach(r => {
+ // CURRENT_DATE may be same between two records
+ if (r._2 == 0) {
+ assert(r._1._1 <= r._1._2)
+ } else {
+ assert(r._1._1 < r._1._2)
+ }
+ })
+ }
+
+ @Test
+ def testCurrentDateTimeInBatchMode(): Unit = {
+ config.getConfiguration.set(ExecutionOptions.RUNTIME_MODE,
RuntimeExecutionMode.BATCH)
+ config.getConfiguration.setLong("__table.query-start.epoch-time__", 1000L)
+ config.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
+ val temporalFunctions = getCodeGenFunctions(List(
+ "CURRENT_DATE",
+ "CURRENT_TIME",
+ "CURRENT_TIMESTAMP",
+ "NOW()",
+ "LOCALTIME",
+ "LOCALTIMESTAMP"))
+
+ val expected = mutable.MutableList[String](
+ "1970-01-01",
+ "00:00:01",
+ "1970-01-01 08:00:01",
+ "1970-01-01 08:00:01",
+ "08:00:01",
+ "1970-01-01 08:00:01")
+
+ val result1 = evaluateFunctionResult(temporalFunctions)
+ assertEquals(expected.toList.sorted, result1.sorted)
+
+ Thread.sleep(1 * 1000L)
+ val result2: List[String] = evaluateFunctionResult(temporalFunctions)
+ assertEquals(expected.toList.sorted, result2.sorted)
+ }
+
+ @Test
+ def testCurrentTimestampInUTC(): Unit = {
+ config.setLocalTimeZone(ZoneId.of("UTC"))
+ val localDateTime = LocalDateTime.now(ZoneId.of("UTC"))
+
+ val formattedCurrentDate = localDateTime
+ .format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))
+ val formattedCurrentTime = localDateTime
+ .toLocalTime
+ .format(DateTimeFormatter.ofPattern("HH:mm:ss"))
+ val formattedCurrentTimestamp = localDateTime
+ .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
+
+ // the CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP/NOW() functions are
+ // not deterministic, thus we
+ // use following pattern to check the returned SQL timestamp in session
time zone UTC
+ testSqlApi(
+ s"DATE_SUB(CURRENT_DATE, DATE '$formattedCurrentDate') = 0",
+ "true")
+
+ testSqlApi(
+ s"TIME_SUB(CURRENT_TIME, TIME '$formattedCurrentTime') <= 60000",
+ "true")
+
+ testSqlApi(
+ s"${timestampLtz(formattedCurrentTimestamp)} < CURRENT_TIMESTAMP",
+ "true")
+
+ testSqlApi(
+ s"${timestampLtz(formattedCurrentTimestamp)} < NOW()",
Review comment:
Why not use `timestampdiff` to check value?
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
##########
@@ -550,6 +690,77 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
fieldTerm
}
+ /**
+ * Adds a reusable session-level date to the beginning of the SAM of the
generated class.
+ */
+ def addReusableSessionDate(): String = {
+ val fieldTerm = s"sessionDate"
+
+ val sessionTimestamp = addReusableSessionTimestamp()
+ val sessionTime = addReusableSessionTime()
+ reusableMemberStatements.add(
+ s"""
+ |private static final int $fieldTerm =
+ | $sessionTime < 0 ?
+ | (int) ($sessionTimestamp.getMillisecond() /
${DateTimeUtils.MILLIS_PER_DAY}) - 1 :
+ | (int) ($sessionTimestamp.getMillisecond() /
${DateTimeUtils.MILLIS_PER_DAY});
+ |""".stripMargin)
+
+ fieldTerm
+ }
+
+ /**
+ * Adds a reusable row-level current date to the beginning of the SAM of the
generated class.
+ */
+ def addReusableCurrentDate(): String = {
Review comment:
There should be only one date method, why we have
`addReusableCurrentDate` and `addReusableDate`. And this method is never used.
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
##########
@@ -455,16 +455,21 @@ abstract class PlannerBase(
* the configuration before planner do optimization with [[ModifyOperation]]
or other works.
*/
protected def validateAndOverrideConfiguration(): Unit = {
- if
(!config.getConfiguration.get(TableConfigOptions.TABLE_PLANNER).equals(PlannerType.BLINK))
{
+ val configuration = config.getConfiguration
+ if
(!configuration.get(TableConfigOptions.TABLE_PLANNER).equals(PlannerType.BLINK))
{
throw new IllegalArgumentException(
"Mismatch between configured planner and actual planner. " +
"Currently, the 'table.planner' can only be set when instantiating
the " +
"table environment. Subsequent changes are not supported. " +
"Please instantiate a new TableEnvironment if necessary.");
}
+ // Add a query start time to TableConfig, this config is only used
internal,
+ // this config will be used by temporal functions like CURRENT_TIMESTAMP.
+ configuration.setLong("__table.query-start.epoch-time__",
System.currentTimeMillis())
Review comment:
I think so. We should remove this flag again, otherwise, users may see
this confusing config when list configurations or use `SET` command in SQL
Client.
##########
File path: docs/data/sql_functions.yml
##########
@@ -420,13 +420,18 @@ temporal:
description: Returns the current SQL date in the UTC time zone.
Review comment:
Should we also update the description about evaluation for the
CURRENT_DATE?
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlCurrentTimestampFunction.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.sql;
+
+import org.apache.calcite.sql.SqlSyntax;
+import org.apache.calcite.sql.fun.SqlAbstractTimeFunction;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Function that returns current timestamp, the function return type is {@link
+ * SqlTypeName#TIMESTAMP_WITH_LOCAL_TIME_ZONE}.
+ */
+public class SqlCurrentTimestampFunction extends SqlAbstractTimeFunction {
+
+ public SqlCurrentTimestampFunction(String name, SqlTypeName typeName) {
+ // access protected constructor
+ super(name, typeName);
+ }
+
+ @Override
+ public SqlSyntax getSyntax() {
+ return SqlSyntax.FUNCTION_ID;
Review comment:
not need to override?
##########
File path:
flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
##########
@@ -421,35 +421,41 @@ trait ImplicitExpressionConversions {
}
/**
- * Returns the current SQL date in UTC time zone.
+ * Returns the current SQL date in local time zone,
+ * the underlying function return type is [[DataTypes.DATE]].
*/
def currentDate(): Expression = {
Expressions.currentDate()
}
/**
- * Returns the current SQL time in UTC time zone.
+ * Returns the current SQL time in local time zone,
+ * the underlying function return type is [[DataTypes.TIME]].
Review comment:
the return type of this expression is [[DataTypes.TIME]].
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
##########
@@ -481,6 +511,79 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
fieldTerm
}
+ /**
+ * Adds a reusable session-level time to the beginning of the SAM of
+ * the generated [[Function]].
+ */
+ def addReusableSessionTime(): String = {
+ val fieldTerm = s"sessionTime"
+
+ val sessionTimestamp = addReusableSessionTimestamp()
+ reusableMemberStatements.add(
+ s"""
+ |private static final int $fieldTerm = (int)
+ | ($sessionTimestamp.getMillisecond() % ${DateTimeUtils.MILLIS_PER_DAY})
< 0 ?
Review comment:
I don't know why `org.apache.calcite.runtime.SqlFunctions#currentTime()`
considered negative millisecond. But I think the current millisecond should
never small than zero, so the logic can be simplified into `(int)
(localTimestamp % DateTimeUtils.MILLIS_PER_DAY)`.
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
##########
@@ -505,8 +608,27 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
}
/**
- * Adds a reusable local time to the beginning of the SAM of the generated
class.
- */
+ * Adds a reusable session-level local date time to the beginning of
+ * the SAM of the generated class.
+ */
+ def addReusableSessionLocalDateTime(): String = {
+ val fieldTerm = s"sessionLocaltimestamp"
+
+ val sessionTimeZone = addReusableSessionTimeZone()
+ val sessionLocaltimestamp = addReusableSessionTimestamp()
+
+ reusableMemberStatements.add(
+ s"""
+ |private static final $TIMESTAMP_DATA $fieldTerm =
$TIMESTAMP_DATA.fromEpochMillis(
+ | $sessionLocaltimestamp.getMillisecond()
+ | + $sessionTimeZone.getOffset($sessionLocaltimestamp.getMillisecond()));
Review comment:
This looks quite error-prone, because `addReusableSessionTimestamp`
generates a currentTimestamp insteead of localTimestamp.
Personally, I perfer to pass both long values of currentTimestamp and
localTimestamp through TableConfig. This can avoid calculating localTimestamp
every time when used which is error-prone. Calcite also does like this, see
`org.apache.calcite.DataContext.Variable#LOCAL_TIMESTAMP`.
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
##########
@@ -458,8 +461,35 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
}
/**
- * Adds a reusable time to the beginning of the SAM of the generated
[[Function]].
- */
+ * Adds a reusable session-level timestamp to the beginning of the SAM of
the generated class.
+ *
+ * <p> The timestamp value is evaluated once for one session, this
+ * function is generally used for batch job.
+ */
+ def addReusableSessionTimestamp(): String = {
Review comment:
Personally, I don't think this is a session-level timestamp. Because
every query submitted in the same session have different timestamp value.
I would suggest to explicitly mention per-record and query-start in the
method name, e.g. `addReusableQueryLevelTimestamp` and
`addReusableRecordLevelTimestamp`.
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
##########
@@ -458,8 +461,35 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
}
/**
- * Adds a reusable time to the beginning of the SAM of the generated
[[Function]].
- */
+ * Adds a reusable session-level timestamp to the beginning of the SAM of
the generated class.
+ *
+ * <p> The timestamp value is evaluated once for one session, this
+ * function is generally used for batch job.
+ */
+ def addReusableSessionTimestamp(): String = {
+ val fieldTerm = s"sessionTimestamp"
+
+ val queryStartEpoch = tableConfig.getConfiguration
+ .getLong("__table.query-start.epoch-time__", 0)
Review comment:
Would be better to have an internal `Option`, so that
1. we can maintain the key in one place
2. we can use `getOptional(..).orElseThrow()` here
##########
File path: docs/data/sql_functions.yml
##########
@@ -420,13 +420,18 @@ temporal:
description: Returns the current SQL date in the UTC time zone.
- sql: CURRENT_TIME
table: currentTime()
- description: Returns the current SQL time in the UTC time zone.
+ description: Returns the current SQL time in the local time zone. This
function is not deterministic, it would be evaluated for each record in stream
job, it would be evaluated once in batch job at query start.
Review comment:
What do you think about:
It is evaluated for each record in streaming mode. But in batch mode, it is
evaluated once as the query starts and uses the same result for every row.
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
##########
@@ -458,8 +461,35 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
}
/**
- * Adds a reusable time to the beginning of the SAM of the generated
[[Function]].
- */
+ * Adds a reusable session-level timestamp to the beginning of the SAM of
the generated class.
+ *
+ * <p> The timestamp value is evaluated once for one session, this
+ * function is generally used for batch job.
+ */
+ def addReusableSessionTimestamp(): String = {
+ val fieldTerm = s"sessionTimestamp"
+
+ val queryStartEpoch = tableConfig.getConfiguration
+ .getLong("__table.query-start.epoch-time__", 0)
+
+ val queryStartEpochStr = if (queryStartEpoch != 0) {
+ s"${queryStartEpoch}L"
+ } else {
+ throw new CodeGenException("Try to obtain current session timestamp
fail. " +
+ "This is a bug, please file an issue.")
+ }
+
+ reusableMemberStatements.add(s"""
+ |private static final $TIMESTAMP_DATA $fieldTerm =
+ |$TIMESTAMP_DATA.fromEpochMillis($queryStartEpochStr);
+ |""".stripMargin)
Review comment:
The code style should be
```scala
reusableMemberStatements.add(
s"""
|private static final $TIMESTAMP_DATA $fieldTerm =
|$TIMESTAMP_DATA.fromEpochMillis($queryStartEpochStr);
|""".stripMargin)
```
##########
File path: docs/data/sql_functions.yml
##########
@@ -420,13 +420,18 @@ temporal:
description: Returns the current SQL date in the UTC time zone.
- sql: CURRENT_TIME
table: currentTime()
- description: Returns the current SQL time in the UTC time zone.
+ description: Returns the current SQL time in the local time zone. This
function is not deterministic, it would be evaluated for each record in stream
job, it would be evaluated once in batch job at query start.
+ - sql: CURRENT_TIMESTAMP
+ table: currentTimestamp()
+ description: Returns the current SQL timestamp in the local time zone, the
return type is TIMESTAMP WITH LOCAL TIME ZONE. This function is not
deterministic, it would be evaluated for each record in stream job, it would be
evaluated once in batch job at query start.
- sql: LOCALTIME
table: localTime()
- description: Returns the current SQL timestamp in the UTC time zone.
+ description: Returns the current SQL timestamp in the local time zone,
this is a synonym of CURRENT_TIME. This function is not deterministic, it would
be evaluated for each record in stream job, it would be evaluated once in batch
job at query start.
- sql: LOCALTIMESTAMP
table: localTimestamp()
- description: Returns the current SQL timestamp in local time zone.
+ description: Returns the current SQL timestamp in local time zone, the
return type is TIMESTAMP. This function is not deterministic, it would be
evaluated for each record in stream job, it would be evaluated once in batch
job at query start.
Review comment:
"return type is TIMESTAMP WITHOUT TIME ZONE" to make it more distinguish
with return typ e of CURRENT_TIMESTAMP?
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
##########
@@ -526,7 +648,25 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
}
/**
- * Adds a reusable date to the beginning of the SAM of the generated class.
+ * Adds a reusable session-level local time to the beginning of
+ * the SAM of the generated class.
+ */
+ def addReusableSessionLocalTime(): String = {
Review comment:
`CURRENT_TIME` is a synonym of `LOCALTIME`, the implementation should be
same. Why we have two implementations?
I also tested `CURRENT_TIME` and the result is not correct.
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
##########
@@ -208,6 +209,63 @@ abstract class ExpressionTestBase {
invalidTableApiExprs += ((expr, keywords, clazz))
}
+ // return the codegen function instances
+ def getCodeGenFunctions(sqlExprs: List[String]) : MapFunction[RowData,
BinaryRowData] = {
+ val testSqlExprs = mutable.ArrayBuffer[(String, RexNode, String)]()
+ sqlExprs.foreach(exp => addSqlTestExpr(exp, null, testSqlExprs, null))
+ getCodeGenFunction(testSqlExprs.map(r => r._2).toList)
+ }
+
+ // return the codegen function instances
+ def evaluateFunctionResult(mapper: MapFunction[RowData, BinaryRowData])
+ : List[String] = {
+ val isRichFunction = mapper.isInstanceOf[RichFunction]
+
+ // call setRuntimeContext method and open method for RichFunction
+ if (isRichFunction) {
+ val richMapper = mapper.asInstanceOf[RichMapFunction[_, _]]
+ val t = new RuntimeUDFContext(
+ new TaskInfo("ExpressionTest", 1, 0, 1, 1),
+ classOf[ExpressionTestBase].getClassLoader,
+ env.getConfig,
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ null)
+ richMapper.setRuntimeContext(t)
+ richMapper.open(new Configuration())
+ }
+
+ val testRow = if (containsLegacyTypes) {
+ val converter = DataFormatConverters
+ .getConverterForDataType(resolvedDataType)
+ .asInstanceOf[DataFormatConverter[RowData, Row]]
+ converter.toInternal(testData)
+ } else {
+ val converter = DataStructureConverters
+ .getConverter(resolvedDataType)
+ .asInstanceOf[DataStructureConverter[RowData, Row]]
+ converter.toInternalOrNull(testData)
+ }
+ val result = mapper.map(testRow)
+
+ // call close method for RichFunction
+ if (isRichFunction) {
+ mapper.asInstanceOf[RichMapFunction[_, _]].close()
+ }
+
+ val resultList = new ListBuffer[String]()
+ for (index <- 0 to result.getArity - 1) {
Review comment:
Simplify to `for (index <- 0 until result.getArity)`.
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
##########
@@ -20,52 +20,164 @@ package org.apache.flink.table.planner.expressions
import java.sql.Time
import java.time.format.DateTimeFormatter
-import java.time.{LocalDateTime, ZoneId}
+import java.time.{LocalDate, LocalDateTime, ZoneId}
+import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.ExecutionOptions
import org.apache.flink.table.api._
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.planner.expressions.utils.ExpressionTestBase
import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
import org.junit.Test
+import scala.collection.mutable
+
/**
* Tests that check all non-deterministic functions can be executed.
*/
class NonDeterministicTests extends ExpressionTestBase {
@Test
- def testCurrentDate(): Unit = {
+ def testCurrentDateTime(): Unit = {
testAllApis(
currentDate().isGreater("1970-01-01".toDate),
- "currentDate() > '1970-01-01'.toDate",
"CURRENT_DATE > DATE '1970-01-01'",
"true")
- }
- @Test
- def testCurrentTime(): Unit = {
testAllApis(
currentTime().isGreaterOrEqual("00:00:00".toTime),
- "currentTime() >= '00:00:00'.toTime",
"CURRENT_TIME >= TIME '00:00:00'",
"true")
+
+ testAllApis(
+ currentTimestamp().isGreater(
+ "1970-01-01
00:00:00".cast(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())),
+ s"CURRENT_TIMESTAMP > ${timestampLtz("1970-01-01 00:00:00")}",
+ "true")
+
+ testSqlApi(s"NOW() > ${timestampLtz("1970-01-01 00:00:00")}",
+ "true")
}
@Test
- def testCurrentTimestamp(): Unit = {
- testAllApis(
- currentTimestamp().isGreater("1970-01-01 00:00:00".toTimestamp),
- "currentTimestamp() > '1970-01-01 00:00:00'.toTimestamp",
- "CURRENT_TIMESTAMP > TIMESTAMP '1970-01-01 00:00:00'",
+ def testCurrentDateTimeInStreamMode(): Unit = {
+ val temporalFunctions = getCodeGenFunctions(List(
+ "CURRENT_DATE",
+ "CURRENT_TIME",
+ "CURRENT_TIMESTAMP",
+ "NOW()",
+ "LOCALTIME",
+ "LOCALTIMESTAMP"))
+ val round1 = evaluateFunctionResult(temporalFunctions)
+ Thread.sleep(1 * 1000L)
+ val round2: List[String] = evaluateFunctionResult(temporalFunctions)
+
+ assertEquals(round1.size, round2.size)
+ round1.zip(round2).zipWithIndex.foreach(r => {
+ // CURRENT_DATE may be same between two records
+ if (r._2 == 0) {
+ assert(r._1._1 <= r._1._2)
+ } else {
+ assert(r._1._1 < r._1._2)
+ }
+ })
+ }
+
+ @Test
+ def testCurrentDateTimeInBatchMode(): Unit = {
+ config.getConfiguration.set(ExecutionOptions.RUNTIME_MODE,
RuntimeExecutionMode.BATCH)
+ config.getConfiguration.setLong("__table.query-start.epoch-time__", 1000L)
+ config.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
+ val temporalFunctions = getCodeGenFunctions(List(
+ "CURRENT_DATE",
+ "CURRENT_TIME",
+ "CURRENT_TIMESTAMP",
+ "NOW()",
+ "LOCALTIME",
+ "LOCALTIMESTAMP"))
+
+ val expected = mutable.MutableList[String](
+ "1970-01-01",
+ "00:00:01",
Review comment:
This is incorrect.
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
##########
@@ -20,52 +20,164 @@ package org.apache.flink.table.planner.expressions
import java.sql.Time
import java.time.format.DateTimeFormatter
-import java.time.{LocalDateTime, ZoneId}
+import java.time.{LocalDate, LocalDateTime, ZoneId}
+import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.ExecutionOptions
import org.apache.flink.table.api._
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.planner.expressions.utils.ExpressionTestBase
import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
import org.junit.Test
+import scala.collection.mutable
+
/**
* Tests that check all non-deterministic functions can be executed.
*/
class NonDeterministicTests extends ExpressionTestBase {
@Test
- def testCurrentDate(): Unit = {
+ def testCurrentDateTime(): Unit = {
testAllApis(
currentDate().isGreater("1970-01-01".toDate),
- "currentDate() > '1970-01-01'.toDate",
"CURRENT_DATE > DATE '1970-01-01'",
"true")
- }
- @Test
- def testCurrentTime(): Unit = {
testAllApis(
currentTime().isGreaterOrEqual("00:00:00".toTime),
- "currentTime() >= '00:00:00'.toTime",
"CURRENT_TIME >= TIME '00:00:00'",
"true")
+
+ testAllApis(
+ currentTimestamp().isGreater(
+ "1970-01-01
00:00:00".cast(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())),
+ s"CURRENT_TIMESTAMP > ${timestampLtz("1970-01-01 00:00:00")}",
+ "true")
+
+ testSqlApi(s"NOW() > ${timestampLtz("1970-01-01 00:00:00")}",
+ "true")
}
@Test
- def testCurrentTimestamp(): Unit = {
- testAllApis(
- currentTimestamp().isGreater("1970-01-01 00:00:00".toTimestamp),
- "currentTimestamp() > '1970-01-01 00:00:00'.toTimestamp",
- "CURRENT_TIMESTAMP > TIMESTAMP '1970-01-01 00:00:00'",
+ def testCurrentDateTimeInStreamMode(): Unit = {
+ val temporalFunctions = getCodeGenFunctions(List(
+ "CURRENT_DATE",
+ "CURRENT_TIME",
+ "CURRENT_TIMESTAMP",
+ "NOW()",
+ "LOCALTIME",
+ "LOCALTIMESTAMP"))
+ val round1 = evaluateFunctionResult(temporalFunctions)
+ Thread.sleep(1 * 1000L)
+ val round2: List[String] = evaluateFunctionResult(temporalFunctions)
+
+ assertEquals(round1.size, round2.size)
+ round1.zip(round2).zipWithIndex.foreach(r => {
+ // CURRENT_DATE may be same between two records
+ if (r._2 == 0) {
+ assert(r._1._1 <= r._1._2)
+ } else {
+ assert(r._1._1 < r._1._2)
+ }
+ })
+ }
+
+ @Test
+ def testCurrentDateTimeInBatchMode(): Unit = {
+ config.getConfiguration.set(ExecutionOptions.RUNTIME_MODE,
RuntimeExecutionMode.BATCH)
+ config.getConfiguration.setLong("__table.query-start.epoch-time__", 1000L)
+ config.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
+ val temporalFunctions = getCodeGenFunctions(List(
+ "CURRENT_DATE",
+ "CURRENT_TIME",
+ "CURRENT_TIMESTAMP",
+ "NOW()",
+ "LOCALTIME",
+ "LOCALTIMESTAMP"))
+
+ val expected = mutable.MutableList[String](
+ "1970-01-01",
+ "00:00:01",
+ "1970-01-01 08:00:01",
+ "1970-01-01 08:00:01",
+ "08:00:01",
+ "1970-01-01 08:00:01")
+
+ val result1 = evaluateFunctionResult(temporalFunctions)
+ assertEquals(expected.toList.sorted, result1.sorted)
+
+ Thread.sleep(1 * 1000L)
+ val result2: List[String] = evaluateFunctionResult(temporalFunctions)
+ assertEquals(expected.toList.sorted, result2.sorted)
Review comment:
Personally, I think we don't need the second round. Becuase CI runs at
different time can already cover this.
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
##########
@@ -20,52 +20,164 @@ package org.apache.flink.table.planner.expressions
import java.sql.Time
import java.time.format.DateTimeFormatter
-import java.time.{LocalDateTime, ZoneId}
+import java.time.{LocalDate, LocalDateTime, ZoneId}
+import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.ExecutionOptions
import org.apache.flink.table.api._
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.planner.expressions.utils.ExpressionTestBase
import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
import org.junit.Test
+import scala.collection.mutable
+
/**
* Tests that check all non-deterministic functions can be executed.
*/
class NonDeterministicTests extends ExpressionTestBase {
@Test
- def testCurrentDate(): Unit = {
+ def testCurrentDateTime(): Unit = {
testAllApis(
currentDate().isGreater("1970-01-01".toDate),
- "currentDate() > '1970-01-01'.toDate",
"CURRENT_DATE > DATE '1970-01-01'",
"true")
- }
- @Test
- def testCurrentTime(): Unit = {
testAllApis(
currentTime().isGreaterOrEqual("00:00:00".toTime),
- "currentTime() >= '00:00:00'.toTime",
"CURRENT_TIME >= TIME '00:00:00'",
"true")
+
+ testAllApis(
+ currentTimestamp().isGreater(
+ "1970-01-01
00:00:00".cast(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())),
+ s"CURRENT_TIMESTAMP > ${timestampLtz("1970-01-01 00:00:00")}",
+ "true")
+
+ testSqlApi(s"NOW() > ${timestampLtz("1970-01-01 00:00:00")}",
+ "true")
}
@Test
- def testCurrentTimestamp(): Unit = {
- testAllApis(
- currentTimestamp().isGreater("1970-01-01 00:00:00".toTimestamp),
- "currentTimestamp() > '1970-01-01 00:00:00'.toTimestamp",
- "CURRENT_TIMESTAMP > TIMESTAMP '1970-01-01 00:00:00'",
+ def testCurrentDateTimeInStreamMode(): Unit = {
+ val temporalFunctions = getCodeGenFunctions(List(
+ "CURRENT_DATE",
+ "CURRENT_TIME",
+ "CURRENT_TIMESTAMP",
+ "NOW()",
+ "LOCALTIME",
+ "LOCALTIMESTAMP"))
+ val round1 = evaluateFunctionResult(temporalFunctions)
+ Thread.sleep(1 * 1000L)
+ val round2: List[String] = evaluateFunctionResult(temporalFunctions)
+
+ assertEquals(round1.size, round2.size)
+ round1.zip(round2).zipWithIndex.foreach(r => {
+ // CURRENT_DATE may be same between two records
+ if (r._2 == 0) {
+ assert(r._1._1 <= r._1._2)
+ } else {
+ assert(r._1._1 < r._1._2)
+ }
Review comment:
What a magic code!!! Please improve the code readability.
```suggestion
round1.zip(round2).zipWithIndex.foreach {
case ((result1: String, result2: String), index: Int) => {
// CURRENT_DATE may be same between two records
if (index == 0) {
assert(result1 <= result2)
} else {
assert(result2 < result2)
}
}
}
```
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/CurrentTimePointCallGen.scala
##########
@@ -21,38 +21,67 @@ package org.apache.flink.table.planner.codegen.calls
import
org.apache.flink.table.planner.codegen.GenerateUtils.generateNonNullField
import org.apache.flink.table.planner.codegen.{CodeGeneratorContext,
GeneratedExpression}
import org.apache.flink.table.types.logical.LogicalType
-import org.apache.flink.table.types.logical.LogicalTypeRoot.{DATE,
TIMESTAMP_WITHOUT_TIME_ZONE, TIME_WITHOUT_TIME_ZONE}
+import org.apache.flink.table.types.logical.LogicalTypeRoot.{DATE,
TIMESTAMP_WITHOUT_TIME_ZONE,TIMESTAMP_WITH_LOCAL_TIME_ZONE,
TIME_WITHOUT_TIME_ZONE}
/**
* Generates function call to determine current time point (as
date/time/timestamp) in
* local timezone or not.
*/
-class CurrentTimePointCallGen(local: Boolean) extends CallGenerator {
+class CurrentTimePointCallGen(local: Boolean, isStreaming: Boolean = true)
extends CallGenerator {
Review comment:
I think we don't need provide default value for `isStreaming`, because
only one place using it.
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
##########
@@ -20,52 +20,164 @@ package org.apache.flink.table.planner.expressions
import java.sql.Time
import java.time.format.DateTimeFormatter
-import java.time.{LocalDateTime, ZoneId}
+import java.time.{LocalDate, LocalDateTime, ZoneId}
+import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.ExecutionOptions
import org.apache.flink.table.api._
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.planner.expressions.utils.ExpressionTestBase
import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
import org.junit.Test
+import scala.collection.mutable
+
/**
* Tests that check all non-deterministic functions can be executed.
*/
class NonDeterministicTests extends ExpressionTestBase {
@Test
- def testCurrentDate(): Unit = {
+ def testCurrentDateTime(): Unit = {
testAllApis(
currentDate().isGreater("1970-01-01".toDate),
- "currentDate() > '1970-01-01'.toDate",
"CURRENT_DATE > DATE '1970-01-01'",
"true")
- }
- @Test
- def testCurrentTime(): Unit = {
testAllApis(
currentTime().isGreaterOrEqual("00:00:00".toTime),
- "currentTime() >= '00:00:00'.toTime",
"CURRENT_TIME >= TIME '00:00:00'",
"true")
+
+ testAllApis(
+ currentTimestamp().isGreater(
+ "1970-01-01
00:00:00".cast(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())),
+ s"CURRENT_TIMESTAMP > ${timestampLtz("1970-01-01 00:00:00")}",
+ "true")
+
+ testSqlApi(s"NOW() > ${timestampLtz("1970-01-01 00:00:00")}",
+ "true")
}
@Test
- def testCurrentTimestamp(): Unit = {
- testAllApis(
- currentTimestamp().isGreater("1970-01-01 00:00:00".toTimestamp),
- "currentTimestamp() > '1970-01-01 00:00:00'.toTimestamp",
- "CURRENT_TIMESTAMP > TIMESTAMP '1970-01-01 00:00:00'",
+ def testCurrentDateTimeInStreamMode(): Unit = {
+ val temporalFunctions = getCodeGenFunctions(List(
+ "CURRENT_DATE",
+ "CURRENT_TIME",
+ "CURRENT_TIMESTAMP",
+ "NOW()",
+ "LOCALTIME",
+ "LOCALTIMESTAMP"))
+ val round1 = evaluateFunctionResult(temporalFunctions)
+ Thread.sleep(1 * 1000L)
+ val round2: List[String] = evaluateFunctionResult(temporalFunctions)
+
+ assertEquals(round1.size, round2.size)
+ round1.zip(round2).zipWithIndex.foreach(r => {
+ // CURRENT_DATE may be same between two records
+ if (r._2 == 0) {
+ assert(r._1._1 <= r._1._2)
+ } else {
+ assert(r._1._1 < r._1._2)
+ }
+ })
+ }
+
+ @Test
+ def testCurrentDateTimeInBatchMode(): Unit = {
+ config.getConfiguration.set(ExecutionOptions.RUNTIME_MODE,
RuntimeExecutionMode.BATCH)
+ config.getConfiguration.setLong("__table.query-start.epoch-time__", 1000L)
+ config.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
+ val temporalFunctions = getCodeGenFunctions(List(
+ "CURRENT_DATE",
+ "CURRENT_TIME",
+ "CURRENT_TIMESTAMP",
+ "NOW()",
+ "LOCALTIME",
+ "LOCALTIMESTAMP"))
+
+ val expected = mutable.MutableList[String](
+ "1970-01-01",
+ "00:00:01",
+ "1970-01-01 08:00:01",
+ "1970-01-01 08:00:01",
+ "08:00:01",
+ "1970-01-01 08:00:01")
+
+ val result1 = evaluateFunctionResult(temporalFunctions)
+ assertEquals(expected.toList.sorted, result1.sorted)
+
+ Thread.sleep(1 * 1000L)
+ val result2: List[String] = evaluateFunctionResult(temporalFunctions)
+ assertEquals(expected.toList.sorted, result2.sorted)
+ }
+
+ @Test
+ def testCurrentTimestampInUTC(): Unit = {
+ config.setLocalTimeZone(ZoneId.of("UTC"))
+ val localDateTime = LocalDateTime.now(ZoneId.of("UTC"))
+
+ val formattedCurrentDate = localDateTime
+ .format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))
+ val formattedCurrentTime = localDateTime
+ .toLocalTime
+ .format(DateTimeFormatter.ofPattern("HH:mm:ss"))
+ val formattedCurrentTimestamp = localDateTime
+ .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
+
+ // the CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP/NOW() functions are
+ // not deterministic, thus we
+ // use following pattern to check the returned SQL timestamp in session
time zone UTC
+ testSqlApi(
+ s"DATE_SUB(CURRENT_DATE, DATE '$formattedCurrentDate') = 0",
Review comment:
There is still possible the return value is not zero, e.g. two time
points crosses two days.
##########
File path: docs/data/sql_functions.yml
##########
@@ -420,13 +420,18 @@ temporal:
description: Returns the current SQL date in the UTC time zone.
- sql: CURRENT_TIME
table: currentTime()
- description: Returns the current SQL time in the UTC time zone.
+ description: Returns the current SQL time in the local time zone. This
function is not deterministic, it would be evaluated for each record in stream
job, it would be evaluated once in batch job at query start.
+ - sql: CURRENT_TIMESTAMP
+ table: currentTimestamp()
+ description: Returns the current SQL timestamp in the local time zone, the
return type is TIMESTAMP WITH LOCAL TIME ZONE. This function is not
deterministic, it would be evaluated for each record in stream job, it would be
evaluated once in batch job at query start.
- sql: LOCALTIME
table: localTime()
- description: Returns the current SQL timestamp in the UTC time zone.
+ description: Returns the current SQL timestamp in the local time zone,
this is a synonym of CURRENT_TIME. This function is not deterministic, it would
be evaluated for each record in stream job, it would be evaluated once in batch
job at query start.
Review comment:
What do you think about reordering the time functions in the following
order?
- LOCALTIME
- LOCALTIMESTAMP
- CURRENT_TIME (synonym of LOCALTIME)
- CURRENT_DATE
- CURRENT_TIMESTAMP
- NOW (synonym of CURRENT_TIMESTAMP)
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala
##########
@@ -90,7 +90,7 @@ class WindowTableFunctionTest extends TableTestBase {
util.tableEnv.executeSql(
"""
|CREATE VIEW v1 AS
- |SELECT *, CURRENT_TIMESTAMP AS cur_time
+ |SELECT *, CAST(CURRENT_TIMESTAMP AS TIMESTAMP(0)) AS cur_time
Review comment:
Would be better to replace with `LOCALTIMESTMAP`.
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
##########
@@ -20,52 +20,164 @@ package org.apache.flink.table.planner.expressions
import java.sql.Time
import java.time.format.DateTimeFormatter
-import java.time.{LocalDateTime, ZoneId}
+import java.time.{LocalDate, LocalDateTime, ZoneId}
+import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.ExecutionOptions
import org.apache.flink.table.api._
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.planner.expressions.utils.ExpressionTestBase
import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
import org.junit.Test
+import scala.collection.mutable
+
/**
* Tests that check all non-deterministic functions can be executed.
*/
class NonDeterministicTests extends ExpressionTestBase {
@Test
- def testCurrentDate(): Unit = {
+ def testCurrentDateTime(): Unit = {
testAllApis(
currentDate().isGreater("1970-01-01".toDate),
- "currentDate() > '1970-01-01'.toDate",
"CURRENT_DATE > DATE '1970-01-01'",
"true")
- }
- @Test
- def testCurrentTime(): Unit = {
testAllApis(
currentTime().isGreaterOrEqual("00:00:00".toTime),
- "currentTime() >= '00:00:00'.toTime",
"CURRENT_TIME >= TIME '00:00:00'",
"true")
+
+ testAllApis(
+ currentTimestamp().isGreater(
+ "1970-01-01
00:00:00".cast(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())),
+ s"CURRENT_TIMESTAMP > ${timestampLtz("1970-01-01 00:00:00")}",
+ "true")
+
+ testSqlApi(s"NOW() > ${timestampLtz("1970-01-01 00:00:00")}",
+ "true")
}
@Test
- def testCurrentTimestamp(): Unit = {
- testAllApis(
- currentTimestamp().isGreater("1970-01-01 00:00:00".toTimestamp),
- "currentTimestamp() > '1970-01-01 00:00:00'.toTimestamp",
- "CURRENT_TIMESTAMP > TIMESTAMP '1970-01-01 00:00:00'",
+ def testCurrentDateTimeInStreamMode(): Unit = {
+ val temporalFunctions = getCodeGenFunctions(List(
+ "CURRENT_DATE",
+ "CURRENT_TIME",
+ "CURRENT_TIMESTAMP",
+ "NOW()",
+ "LOCALTIME",
+ "LOCALTIMESTAMP"))
+ val round1 = evaluateFunctionResult(temporalFunctions)
+ Thread.sleep(1 * 1000L)
+ val round2: List[String] = evaluateFunctionResult(temporalFunctions)
+
+ assertEquals(round1.size, round2.size)
+ round1.zip(round2).zipWithIndex.foreach(r => {
+ // CURRENT_DATE may be same between two records
+ if (r._2 == 0) {
+ assert(r._1._1 <= r._1._2)
+ } else {
+ assert(r._1._1 < r._1._2)
+ }
+ })
+ }
+
+ @Test
+ def testCurrentDateTimeInBatchMode(): Unit = {
+ config.getConfiguration.set(ExecutionOptions.RUNTIME_MODE,
RuntimeExecutionMode.BATCH)
+ config.getConfiguration.setLong("__table.query-start.epoch-time__", 1000L)
+ config.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
+ val temporalFunctions = getCodeGenFunctions(List(
+ "CURRENT_DATE",
+ "CURRENT_TIME",
+ "CURRENT_TIMESTAMP",
+ "NOW()",
+ "LOCALTIME",
+ "LOCALTIMESTAMP"))
+
+ val expected = mutable.MutableList[String](
+ "1970-01-01",
+ "00:00:01",
+ "1970-01-01 08:00:01",
+ "1970-01-01 08:00:01",
+ "08:00:01",
+ "1970-01-01 08:00:01")
+
+ val result1 = evaluateFunctionResult(temporalFunctions)
+ assertEquals(expected.toList.sorted, result1.sorted)
+
+ Thread.sleep(1 * 1000L)
+ val result2: List[String] = evaluateFunctionResult(temporalFunctions)
+ assertEquals(expected.toList.sorted, result2.sorted)
+ }
+
+ @Test
+ def testCurrentTimestampInUTC(): Unit = {
+ config.setLocalTimeZone(ZoneId.of("UTC"))
+ val localDateTime = LocalDateTime.now(ZoneId.of("UTC"))
+
+ val formattedCurrentDate = localDateTime
+ .format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))
+ val formattedCurrentTime = localDateTime
+ .toLocalTime
+ .format(DateTimeFormatter.ofPattern("HH:mm:ss"))
+ val formattedCurrentTimestamp = localDateTime
+ .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
+
+ // the CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP/NOW() functions are
+ // not deterministic, thus we
+ // use following pattern to check the returned SQL timestamp in session
time zone UTC
+ testSqlApi(
+ s"DATE_SUB(CURRENT_DATE, DATE '$formattedCurrentDate') = 0",
+ "true")
+
+ testSqlApi(
+ s"TIME_SUB(CURRENT_TIME, TIME '$formattedCurrentTime') <= 60000",
+ "true")
+
+ testSqlApi(
+ s"${timestampLtz(formattedCurrentTimestamp)} < CURRENT_TIMESTAMP",
+ "true")
+
+ testSqlApi(
+ s"${timestampLtz(formattedCurrentTimestamp)} < NOW()",
"true")
}
@Test
- def testNow(): Unit = {
+ def testCurrentTimestampInShanghai(): Unit = {
Review comment:
Please share common logic with `testCurrentTimestampInUTC`, and we can
merge localtimestamp tests in it.
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
##########
@@ -458,8 +461,35 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
}
/**
- * Adds a reusable time to the beginning of the SAM of the generated
[[Function]].
- */
+ * Adds a reusable session-level timestamp to the beginning of the SAM of
the generated class.
+ *
+ * <p> The timestamp value is evaluated once for one session, this
+ * function is generally used for batch job.
+ */
+ def addReusableSessionTimestamp(): String = {
Review comment:
Besides, I think the `Timestamp` here is very misleading, it sounds like
it generates a current value of TIMESTAMP type, however, it is TIMESTAMP_LTZ
type. What do you think about renaming to
`addReusableQueryLevelCurrentTimestamp` and
`addReusableRecordLevelCurrentTimestamp`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]