This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b2f88660b2b0897e57565e5b197ba83f20a03228
Author: Timo Walther <twal...@apache.org>
AuthorDate: Tue May 26 16:47:06 2020 +0200

    [hotfix][table-planner-blink] Prepare ExpressionTestBase for new type system
---
 .../expressions/utils/ExpressionTestBase.scala     | 116 ++++++++++++++-------
 1 file changed, 80 insertions(+), 36 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
index 8469857..71a7687 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
@@ -18,42 +18,42 @@
 
 package org.apache.flink.table.planner.expressions.utils
 
+import java.util.Collections
+
+import org.apache.calcite.plan.hep.{HepPlanner, HepProgramBuilder}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.logical.LogicalCalc
+import org.apache.calcite.rel.rules._
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.`type`.SqlTypeName.VARCHAR
 import org.apache.flink.api.common.TaskInfo
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext
 import org.apache.flink.api.common.functions.{MapFunction, RichFunction, 
RichMapFunction}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.table.api.internal.TableEnvironmentImpl
 import 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl
 import org.apache.flink.table.api.{EnvironmentSettings, TableConfig}
 import org.apache.flink.table.data.RowData
 import org.apache.flink.table.data.binary.BinaryRowData
+import org.apache.flink.table.data.conversion.{DataStructureConverter, 
DataStructureConverters}
 import org.apache.flink.table.data.util.DataFormatConverters
+import 
org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter
 import org.apache.flink.table.expressions.{Expression, ExpressionParser}
 import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
ExprCodeGenerator, FunctionCodeGenerator}
 import org.apache.flink.table.planner.delegation.PlannerBase
 import 
org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
-import org.apache.flink.table.types.DataType
+import org.apache.flink.table.types.AbstractDataType
 import org.apache.flink.table.types.logical.{RowType, VarCharType}
 import org.apache.flink.table.types.utils.TypeConversions
 import org.apache.flink.types.Row
-
-import org.apache.calcite.plan.hep.{HepPlanner, HepProgramBuilder}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.logical.{LogicalCalc, LogicalTableScan}
-import org.apache.calcite.rel.rules._
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.`type`.SqlTypeName.VARCHAR
-
 import org.junit.Assert.{assertEquals, fail}
 import org.junit.rules.ExpectedException
 import org.junit.{After, Before, Rule}
 
-import java.util.Collections
-
 import scala.collection.mutable
+import scala.collection.JavaConverters._
 
 abstract class ExpressionTestBase {
 
@@ -66,7 +66,13 @@ abstract class ExpressionTestBase {
   // use impl class instead of interface class to avoid
   // "Static methods in interface require -target:jvm-1.8"
   private val tEnv = StreamTableEnvironmentImpl.create(env, setting, config)
-  private val planner = 
tEnv.asInstanceOf[TableEnvironmentImpl].getPlanner.asInstanceOf[PlannerBase]
+    .asInstanceOf[StreamTableEnvironmentImpl]
+  private val resolvedDataType = if (containsLegacyTypes) {
+    TypeConversions.fromLegacyInfoToDataType(typeInfo)
+  } else {
+    tEnv.getCatalogManager.getDataTypeFactory.createDataType(testDataType)
+  }
+  private val planner = tEnv.getPlanner.asInstanceOf[PlannerBase]
   private val relBuilder = planner.getRelBuilder
   private val calcitePlanner = planner.createFlinkPlanner
   private val parser = planner.plannerContext.createCalciteParser()
@@ -82,13 +88,16 @@ abstract class ExpressionTestBase {
   @Rule
   def thrown: ExpectedException = expectedException
 
-  def functions: Map[String, ScalarFunction] = Map()
-
   @Before
   def prepare(): Unit = {
-    val ds = env.fromCollection(Collections.emptyList[Row](), typeInfo)
-    tEnv.createTemporaryView(tableName, ds)
-    functions.foreach(f => tEnv.registerFunction(f._1, f._2))
+    if (containsLegacyTypes) {
+      val ds = env.fromCollection(Collections.emptyList[Row](), typeInfo)
+      tEnv.createTemporaryView(tableName, ds)
+      functions.foreach(f => tEnv.registerFunction(f._1, f._2))
+    } else {
+      tEnv.createTemporaryView(tableName, tEnv.fromValues(resolvedDataType))
+      testSystemFunctions.asScala.foreach(e => 
tEnv.createTemporarySystemFunction(e._1, e._2))
+    }
 
     // prepare RelBuilder
     relBuilder.scan(tableName)
@@ -100,7 +109,11 @@ abstract class ExpressionTestBase {
   @After
   def evaluateExprs(): Unit = {
     val ctx = CodeGeneratorContext(config)
-    val inputType = fromTypeInfoToLogicalType(typeInfo)
+    val inputType = if (containsLegacyTypes) {
+      fromTypeInfoToLogicalType(typeInfo)
+    } else {
+      resolvedDataType.getLogicalType
+    }
     val exprGenerator = new ExprCodeGenerator(ctx, nullableInput = 
false).bindInput(inputType)
 
     // cast expressions to String
@@ -145,10 +158,18 @@ abstract class ExpressionTestBase {
       richMapper.open(new Configuration())
     }
 
-    val converter = DataFormatConverters
-      .getConverterForDataType(dataType)
-      .asInstanceOf[DataFormatConverters.DataFormatConverter[RowData, Row]]
-    val testRow = converter.toInternal(testData)
+    val testRow = if (containsLegacyTypes) {
+      val converter = DataFormatConverters
+        .getConverterForDataType(resolvedDataType)
+        .asInstanceOf[DataFormatConverter[RowData, Row]]
+      converter.toInternal(testData)
+    } else {
+      val converter = DataStructureConverters
+        .getConverter(resolvedDataType)
+        .asInstanceOf[DataStructureConverter[RowData, Row]]
+      converter.toInternalOrNull(testData)
+    }
+
     val result = mapper.map(testRow)
 
     // call close method for RichFunction
@@ -194,7 +215,7 @@ abstract class ExpressionTestBase {
     val optimized = hep.findBestExp()
 
     // throw exception if plan contains more than a calc
-    if (!optimized.getInput(0).isInstanceOf[LogicalTableScan]) {
+    if (!optimized.getInput(0).getInputs.isEmpty) {
       fail("Expression is converted into more than a Calc operation. Use a 
different test method.")
     }
 
@@ -210,24 +231,14 @@ abstract class ExpressionTestBase {
 
   def testAllApis(
       expr: Expression,
-      exprString: String,
       sqlExpr: String,
       expected: String): Unit = {
     addTableApiTestExpr(expr, expected)
-    addTableApiTestExpr(exprString, expected)
     addSqlTestExpr(sqlExpr, expected)
   }
 
   def testTableApi(
       expr: Expression,
-      exprString: String,
-      expected: String): Unit = {
-    addTableApiTestExpr(expr, expected)
-    addTableApiTestExpr(exprString, expected)
-  }
-
-  def testTableApi(
-      expr: Expression,
       expected: String): Unit = {
     addTableApiTestExpr(expr, expected)
   }
@@ -252,8 +263,41 @@ abstract class ExpressionTestBase {
 
   def testData: Row
 
-  def typeInfo: RowTypeInfo
+  def testDataType: AbstractDataType[_] =
+    throw new IllegalArgumentException("Implement this if no legacy types are 
expected.")
+
+  def testSystemFunctions: java.util.Map[String, ScalarFunction] = 
Collections.emptyMap();
+
+  // 
----------------------------------------------------------------------------------------------
+  // Legacy type system
+  // 
----------------------------------------------------------------------------------------------
 
-  def dataType: DataType = TypeConversions.fromLegacyInfoToDataType(typeInfo)
+  def containsLegacyTypes: Boolean = true
 
+  @deprecated
+  def functions: Map[String, ScalarFunction] = Map()
+
+  @deprecated
+  def typeInfo: RowTypeInfo =
+    throw new IllegalArgumentException("Implement this if legacy types are 
expected.")
+
+  @deprecated
+  def testAllApis(
+      expr: Expression,
+      exprString: String,
+      sqlExpr: String,
+      expected: String): Unit = {
+    addTableApiTestExpr(expr, expected)
+    addTableApiTestExpr(exprString, expected)
+    addSqlTestExpr(sqlExpr, expected)
+  }
+
+  @deprecated
+  def testTableApi(
+      expr: Expression,
+      exprString: String,
+      expected: String): Unit = {
+    addTableApiTestExpr(expr, expected)
+    addTableApiTestExpr(exprString, expected)
+  }
 }

Reply via email to