This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new bfe49b2973d [FLINK-32245][table-planner] Rename NonDeterministicTests 
to NonDeterministicTest and fix incorrect test initialization
bfe49b2973d is described below

commit bfe49b2973d4ffc8f7404a376cab1e419b53406a
Author: Jane Chan <[email protected]>
AuthorDate: Fri Jun 2 21:04:59 2023 +0800

    [FLINK-32245][table-planner] Rename NonDeterministicTests to 
NonDeterministicTest and fix incorrect test initialization
    
    This closes #22701
---
 ...isticTests.scala => NonDeterministicTest.scala} | 35 ++++++++++++++--
 .../expressions/utils/ExpressionTestBase.scala     | 48 ++++++++++++++--------
 2 files changed, 62 insertions(+), 21 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTest.scala
similarity index 85%
rename from 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
rename to 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTest.scala
index a9175dfe963..a21b4b30630 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTest.scala
@@ -23,24 +23,31 @@ import org.apache.flink.configuration.ExecutionOptions
 import org.apache.flink.table.api._
 import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.table.planner.expressions.utils.ExpressionTestBase
-import org.apache.flink.table.planner.utils.InternalConfigOptions
+import org.apache.flink.table.planner.utils.{InternalConfigOptions, 
TableConfigUtils}
 import org.apache.flink.types.Row
 
 import org.junit.Assert.assertEquals
+import org.junit.Assume.assumeTrue
 import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
 
+import java.lang.{Long => JLong}
 import java.sql.Time
 import java.time.{LocalDate, LocalDateTime, ZoneId}
 import java.time.format.DateTimeFormatter
+import java.util
 import java.util.TimeZone
 
 import scala.collection.mutable
 
 /** Tests that check all non-deterministic functions can be executed. */
-class NonDeterministicTests extends ExpressionTestBase {
+@RunWith(classOf[Parameterized])
+class NonDeterministicTest(isStreaming: Boolean) extends 
ExpressionTestBase(isStreaming) {
 
   @Test
   def testTemporalFunctionsInStreamMode(): Unit = {
+    assumeTrue(isStreaming)
     val temporalFunctions = getCodeGenFunctions(
       List(
         "CURRENT_DATE",
@@ -76,6 +83,7 @@ class NonDeterministicTests extends ExpressionTestBase {
 
   @Test
   def testTemporalFunctionsInBatchMode(): Unit = {
+    assumeTrue(!isStreaming)
     val zoneId = ZoneId.of("Asia/Shanghai")
     tableConfig.setLocalTimeZone(zoneId)
     tableConfig.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH)
@@ -109,7 +117,7 @@ class NonDeterministicTests extends ExpressionTestBase {
 
   @Test
   def testCurrentRowTimestampFunctionsInBatchMode(): Unit = {
-    tableConfig.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH)
+    assumeTrue(!isStreaming)
     val temporalFunctions = 
getCodeGenFunctions(List("CURRENT_ROW_TIMESTAMP()"))
 
     val round1 = evaluateFunctionResult(temporalFunctions)
@@ -133,8 +141,20 @@ class NonDeterministicTests extends ExpressionTestBase {
     testTemporalTimestamp(ZoneId.of("Asia/Shanghai"))
   }
 
-  private def testTemporalTimestamp(zoneId: ZoneId): Unit = {
+  private def setEpochAndLocalTime(zoneId: ZoneId): Unit = {
     tableConfig.setLocalTimeZone(zoneId)
+    // manually set __table.query-start.epoch-time__ and 
__table.query-start.local-time__
+    // because they are mandatory for batch codegen, see 
PlannerBase#beforeTranslation for more details
+    val epochTime: JLong = System.currentTimeMillis()
+    tableConfig.set(InternalConfigOptions.TABLE_QUERY_START_EPOCH_TIME, 
epochTime)
+    val localTime: JLong = epochTime + TimeZone
+      .getTimeZone(TableConfigUtils.getLocalTimeZone(tableConfig))
+      .getOffset(epochTime)
+    tableConfig.set(InternalConfigOptions.TABLE_QUERY_START_LOCAL_TIME, 
localTime)
+  }
+
+  private def testTemporalTimestamp(zoneId: ZoneId): Unit = {
+    setEpochAndLocalTime(zoneId)
     val localDateTime = LocalDateTime.now(zoneId)
 
     val formattedLocalTime = localDateTime.toLocalTime
@@ -221,3 +241,10 @@ object DateDiffFun extends ScalarFunction {
     d1.toEpochDay - d2.toEpochDay
   }
 }
+
+object NonDeterministicTest {
+  @Parameterized.Parameters(name = "isStream={0}")
+  def parameters(): util.Collection[Boolean] = {
+    util.Arrays.asList(true, false)
+  }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
index b133f7d7c21..e52afd43acf 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
@@ -25,7 +25,7 @@ import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.testutils.FlinkAssertions
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api
-import org.apache.flink.table.api.{EnvironmentSettings, TableException, 
ValidationException}
+import org.apache.flink.table.api.{EnvironmentSettings, TableConfig, 
TableException, ValidationException}
 import 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl
 import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.data.RowData
@@ -35,11 +35,13 @@ import org.apache.flink.table.data.util.DataFormatConverters
 import 
org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter
 import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.functions.ScalarFunction
+import org.apache.flink.table.planner.calcite.{FlinkPlannerImpl, 
FlinkRelBuilder}
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
ExprCodeGenerator, FunctionCodeGenerator}
 import org.apache.flink.table.planner.delegation.PlannerBase
+import org.apache.flink.table.planner.parse.CalciteParser
 import org.apache.flink.table.runtime.generated.GeneratedFunction
 import 
org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
-import org.apache.flink.table.types.AbstractDataType
+import org.apache.flink.table.types.{AbstractDataType, DataType}
 import org.apache.flink.table.types.logical.{RowType, VarCharType}
 import org.apache.flink.table.types.utils.TypeConversions
 import org.apache.flink.types.Row
@@ -60,7 +62,7 @@ import java.util.Collections
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
-abstract class ExpressionTestBase {
+abstract class ExpressionTestBase(isStreaming: Boolean = true) {
 
   // (originalExpr, optimizedExpr, expectedResult)
   private val validExprs = mutable.ArrayBuffer[(String, RexNode, String)]()
@@ -71,24 +73,18 @@ abstract class ExpressionTestBase {
     .ArrayBuffer[(Expression, String, Class[_ <: Throwable])]()
 
   private val env = StreamExecutionEnvironment.createLocalEnvironment(4)
-  private val settings = 
EnvironmentSettings.newInstance().inStreamingMode().build()
+  private var settings: EnvironmentSettings = _
   // use impl class instead of interface class to avoid
   // "Static methods in interface require -target:jvm-1.8"
-  private val tEnv = StreamTableEnvironmentImpl
-    .create(env, settings)
-    .asInstanceOf[StreamTableEnvironmentImpl]
+  private var tEnv: StreamTableEnvironmentImpl = _
 
-  val tableConfig = tEnv.getConfig
+  var tableConfig: TableConfig = _
 
-  private val resolvedDataType = if (containsLegacyTypes) {
-    TypeConversions.fromLegacyInfoToDataType(typeInfo)
-  } else {
-    tEnv.getCatalogManager.getDataTypeFactory.createDataType(testDataType)
-  }
-  private val planner = tEnv.getPlanner.asInstanceOf[PlannerBase]
-  private val relBuilder = planner.createRelBuilder
-  private val calcitePlanner = planner.createFlinkPlanner
-  private val parser = planner.plannerContext.createCalciteParser()
+  private var resolvedDataType: DataType = _
+  private var planner: PlannerBase = _
+  private var relBuilder: FlinkRelBuilder = _
+  private var calcitePlanner: FlinkPlannerImpl = _
+  private var parser: CalciteParser = _
 
   // setup test utils
   private val tableName = "testTable"
@@ -102,10 +98,28 @@ abstract class ExpressionTestBase {
 
   @Before
   def prepare(): Unit = {
+    settings = if (isStreaming) {
+      EnvironmentSettings.newInstance().inStreamingMode().build()
+    } else {
+      EnvironmentSettings.newInstance().inBatchMode().build()
+    }
+    tEnv = StreamTableEnvironmentImpl
+      .create(env, settings)
+      .asInstanceOf[StreamTableEnvironmentImpl]
+    planner = tEnv.getPlanner.asInstanceOf[PlannerBase]
+    relBuilder = planner.createRelBuilder
+    calcitePlanner = planner.createFlinkPlanner
+    parser = planner.plannerContext.createCalciteParser()
+    tableConfig = tEnv.getConfig
     tableConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_LEGACY_CAST_BEHAVIOUR,
       ExecutionConfigOptions.LegacyCastBehaviour.DISABLED
     )
+    resolvedDataType = if (containsLegacyTypes) {
+      TypeConversions.fromLegacyInfoToDataType(typeInfo)
+    } else {
+      tEnv.getCatalogManager.getDataTypeFactory.createDataType(testDataType)
+    }
     if (containsLegacyTypes) {
       val ds = env.fromCollection(Collections.emptyList[Row](), typeInfo)
       tEnv.createTemporaryView(tableName, ds, 
typeInfo.getFieldNames.map(api.$): _*)

Reply via email to