This is an automated email from the ASF dual-hosted git repository.
lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new bfe49b2973d [FLINK-32245][table-planner] Rename NonDeterministicTests
to NonDeterministicTest and fix incorrect test initialization
bfe49b2973d is described below
commit bfe49b2973d4ffc8f7404a376cab1e419b53406a
Author: Jane Chan <[email protected]>
AuthorDate: Fri Jun 2 21:04:59 2023 +0800
[FLINK-32245][table-planner] Rename NonDeterministicTests to
NonDeterministicTest and fix incorrect test initialization
This closes #22701
---
...isticTests.scala => NonDeterministicTest.scala} | 35 ++++++++++++++--
.../expressions/utils/ExpressionTestBase.scala | 48 ++++++++++++++--------
2 files changed, 62 insertions(+), 21 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTest.scala
similarity index 85%
rename from
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
rename to
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTest.scala
index a9175dfe963..a21b4b30630 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTest.scala
@@ -23,24 +23,31 @@ import org.apache.flink.configuration.ExecutionOptions
import org.apache.flink.table.api._
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.planner.expressions.utils.ExpressionTestBase
-import org.apache.flink.table.planner.utils.InternalConfigOptions
+import org.apache.flink.table.planner.utils.{InternalConfigOptions,
TableConfigUtils}
import org.apache.flink.types.Row
import org.junit.Assert.assertEquals
+import org.junit.Assume.assumeTrue
import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import java.lang.{Long => JLong}
import java.sql.Time
import java.time.{LocalDate, LocalDateTime, ZoneId}
import java.time.format.DateTimeFormatter
+import java.util
import java.util.TimeZone
import scala.collection.mutable
/** Tests that check all non-deterministic functions can be executed. */
-class NonDeterministicTests extends ExpressionTestBase {
+@RunWith(classOf[Parameterized])
+class NonDeterministicTest(isStreaming: Boolean) extends
ExpressionTestBase(isStreaming) {
@Test
def testTemporalFunctionsInStreamMode(): Unit = {
+ assumeTrue(isStreaming)
val temporalFunctions = getCodeGenFunctions(
List(
"CURRENT_DATE",
@@ -76,6 +83,7 @@ class NonDeterministicTests extends ExpressionTestBase {
@Test
def testTemporalFunctionsInBatchMode(): Unit = {
+ assumeTrue(!isStreaming)
val zoneId = ZoneId.of("Asia/Shanghai")
tableConfig.setLocalTimeZone(zoneId)
tableConfig.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH)
@@ -109,7 +117,7 @@ class NonDeterministicTests extends ExpressionTestBase {
@Test
def testCurrentRowTimestampFunctionsInBatchMode(): Unit = {
- tableConfig.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH)
+ assumeTrue(!isStreaming)
val temporalFunctions =
getCodeGenFunctions(List("CURRENT_ROW_TIMESTAMP()"))
val round1 = evaluateFunctionResult(temporalFunctions)
@@ -133,8 +141,20 @@ class NonDeterministicTests extends ExpressionTestBase {
testTemporalTimestamp(ZoneId.of("Asia/Shanghai"))
}
- private def testTemporalTimestamp(zoneId: ZoneId): Unit = {
+ private def setEpochAndLocalTime(zoneId: ZoneId): Unit = {
tableConfig.setLocalTimeZone(zoneId)
+ // manually set __table.query-start.epoch-time__ and
__table.query-start.local-time__
+ // because they are mandatory for batch codegen, see
PlannerBase#beforeTranslation for more details
+ val epochTime: JLong = System.currentTimeMillis()
+ tableConfig.set(InternalConfigOptions.TABLE_QUERY_START_EPOCH_TIME,
epochTime)
+ val localTime: JLong = epochTime + TimeZone
+ .getTimeZone(TableConfigUtils.getLocalTimeZone(tableConfig))
+ .getOffset(epochTime)
+ tableConfig.set(InternalConfigOptions.TABLE_QUERY_START_LOCAL_TIME,
localTime)
+ }
+
+ private def testTemporalTimestamp(zoneId: ZoneId): Unit = {
+ setEpochAndLocalTime(zoneId)
val localDateTime = LocalDateTime.now(zoneId)
val formattedLocalTime = localDateTime.toLocalTime
@@ -221,3 +241,10 @@ object DateDiffFun extends ScalarFunction {
d1.toEpochDay - d2.toEpochDay
}
}
+
+object NonDeterministicTest {
+ @Parameterized.Parameters(name = "isStream={0}")
+ def parameters(): util.Collection[Boolean] = {
+ util.Arrays.asList(true, false)
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
index b133f7d7c21..e52afd43acf 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
@@ -25,7 +25,7 @@ import org.apache.flink.configuration.Configuration
import org.apache.flink.core.testutils.FlinkAssertions
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api
-import org.apache.flink.table.api.{EnvironmentSettings, TableException,
ValidationException}
+import org.apache.flink.table.api.{EnvironmentSettings, TableConfig,
TableException, ValidationException}
import
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl
import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.data.RowData
@@ -35,11 +35,13 @@ import org.apache.flink.table.data.util.DataFormatConverters
import
org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.functions.ScalarFunction
+import org.apache.flink.table.planner.calcite.{FlinkPlannerImpl,
FlinkRelBuilder}
import org.apache.flink.table.planner.codegen.{CodeGeneratorContext,
ExprCodeGenerator, FunctionCodeGenerator}
import org.apache.flink.table.planner.delegation.PlannerBase
+import org.apache.flink.table.planner.parse.CalciteParser
import org.apache.flink.table.runtime.generated.GeneratedFunction
import
org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
-import org.apache.flink.table.types.AbstractDataType
+import org.apache.flink.table.types.{AbstractDataType, DataType}
import org.apache.flink.table.types.logical.{RowType, VarCharType}
import org.apache.flink.table.types.utils.TypeConversions
import org.apache.flink.types.Row
@@ -60,7 +62,7 @@ import java.util.Collections
import scala.collection.JavaConverters._
import scala.collection.mutable
-abstract class ExpressionTestBase {
+abstract class ExpressionTestBase(isStreaming: Boolean = true) {
// (originalExpr, optimizedExpr, expectedResult)
private val validExprs = mutable.ArrayBuffer[(String, RexNode, String)]()
@@ -71,24 +73,18 @@ abstract class ExpressionTestBase {
.ArrayBuffer[(Expression, String, Class[_ <: Throwable])]()
private val env = StreamExecutionEnvironment.createLocalEnvironment(4)
- private val settings =
EnvironmentSettings.newInstance().inStreamingMode().build()
+ private var settings: EnvironmentSettings = _
// use impl class instead of interface class to avoid
// "Static methods in interface require -target:jvm-1.8"
- private val tEnv = StreamTableEnvironmentImpl
- .create(env, settings)
- .asInstanceOf[StreamTableEnvironmentImpl]
+ private var tEnv: StreamTableEnvironmentImpl = _
- val tableConfig = tEnv.getConfig
+ var tableConfig: TableConfig = _
- private val resolvedDataType = if (containsLegacyTypes) {
- TypeConversions.fromLegacyInfoToDataType(typeInfo)
- } else {
- tEnv.getCatalogManager.getDataTypeFactory.createDataType(testDataType)
- }
- private val planner = tEnv.getPlanner.asInstanceOf[PlannerBase]
- private val relBuilder = planner.createRelBuilder
- private val calcitePlanner = planner.createFlinkPlanner
- private val parser = planner.plannerContext.createCalciteParser()
+ private var resolvedDataType: DataType = _
+ private var planner: PlannerBase = _
+ private var relBuilder: FlinkRelBuilder = _
+ private var calcitePlanner: FlinkPlannerImpl = _
+ private var parser: CalciteParser = _
// setup test utils
private val tableName = "testTable"
@@ -102,10 +98,28 @@ abstract class ExpressionTestBase {
@Before
def prepare(): Unit = {
+ settings = if (isStreaming) {
+ EnvironmentSettings.newInstance().inStreamingMode().build()
+ } else {
+ EnvironmentSettings.newInstance().inBatchMode().build()
+ }
+ tEnv = StreamTableEnvironmentImpl
+ .create(env, settings)
+ .asInstanceOf[StreamTableEnvironmentImpl]
+ planner = tEnv.getPlanner.asInstanceOf[PlannerBase]
+ relBuilder = planner.createRelBuilder
+ calcitePlanner = planner.createFlinkPlanner
+ parser = planner.plannerContext.createCalciteParser()
+ tableConfig = tEnv.getConfig
tableConfig.set(
ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR,
ExecutionConfigOptions.LegacyCastBehaviour.DISABLED
)
+ resolvedDataType = if (containsLegacyTypes) {
+ TypeConversions.fromLegacyInfoToDataType(typeInfo)
+ } else {
+ tEnv.getCatalogManager.getDataTypeFactory.createDataType(testDataType)
+ }
if (containsLegacyTypes) {
val ds = env.fromCollection(Collections.emptyList[Row](), typeInfo)
tEnv.createTemporaryView(tableName, ds,
typeInfo.getFieldNames.map(api.$): _*)