This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b2f88660b2b0897e57565e5b197ba83f20a03228 Author: Timo Walther <twal...@apache.org> AuthorDate: Tue May 26 16:47:06 2020 +0200 [hotfix][table-planner-blink] Prepare ExpressionTestBase for new type system --- .../expressions/utils/ExpressionTestBase.scala | 116 ++++++++++++++------- 1 file changed, 80 insertions(+), 36 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala index 8469857..71a7687 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala @@ -18,42 +18,42 @@ package org.apache.flink.table.planner.expressions.utils +import java.util.Collections + +import org.apache.calcite.plan.hep.{HepPlanner, HepProgramBuilder} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.logical.LogicalCalc +import org.apache.calcite.rel.rules._ +import org.apache.calcite.rex.RexNode +import org.apache.calcite.sql.`type`.SqlTypeName.VARCHAR import org.apache.flink.api.common.TaskInfo import org.apache.flink.api.common.functions.util.RuntimeUDFContext import org.apache.flink.api.common.functions.{MapFunction, RichFunction, RichMapFunction} import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment -import org.apache.flink.table.api.internal.TableEnvironmentImpl import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl import org.apache.flink.table.api.{EnvironmentSettings, TableConfig} import org.apache.flink.table.data.RowData import org.apache.flink.table.data.binary.BinaryRowData +import org.apache.flink.table.data.conversion.{DataStructureConverter, DataStructureConverters} import org.apache.flink.table.data.util.DataFormatConverters +import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter import org.apache.flink.table.expressions.{Expression, ExpressionParser} import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, ExprCodeGenerator, FunctionCodeGenerator} import org.apache.flink.table.planner.delegation.PlannerBase import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType -import org.apache.flink.table.types.DataType +import org.apache.flink.table.types.AbstractDataType import org.apache.flink.table.types.logical.{RowType, VarCharType} import org.apache.flink.table.types.utils.TypeConversions import org.apache.flink.types.Row - -import org.apache.calcite.plan.hep.{HepPlanner, HepProgramBuilder} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.logical.{LogicalCalc, LogicalTableScan} -import org.apache.calcite.rel.rules._ -import org.apache.calcite.rex.RexNode -import org.apache.calcite.sql.`type`.SqlTypeName.VARCHAR - import org.junit.Assert.{assertEquals, fail} import org.junit.rules.ExpectedException import org.junit.{After, Before, Rule} -import java.util.Collections - import scala.collection.mutable +import scala.collection.JavaConverters._ abstract class ExpressionTestBase { @@ -66,7 +66,13 @@ abstract class ExpressionTestBase { // use impl class instead of interface class to avoid // "Static methods in interface require -target:jvm-1.8" private val tEnv = StreamTableEnvironmentImpl.create(env, setting, config) - private val planner = tEnv.asInstanceOf[TableEnvironmentImpl].getPlanner.asInstanceOf[PlannerBase] + .asInstanceOf[StreamTableEnvironmentImpl] + private val resolvedDataType = if (containsLegacyTypes) { + TypeConversions.fromLegacyInfoToDataType(typeInfo) + } else { + tEnv.getCatalogManager.getDataTypeFactory.createDataType(testDataType) + } + private val planner = tEnv.getPlanner.asInstanceOf[PlannerBase] private val relBuilder = planner.getRelBuilder private val calcitePlanner = planner.createFlinkPlanner private val parser = planner.plannerContext.createCalciteParser() @@ -82,13 +88,16 @@ abstract class ExpressionTestBase { @Rule def thrown: ExpectedException = expectedException - def functions: Map[String, ScalarFunction] = Map() - @Before def prepare(): Unit = { - val ds = env.fromCollection(Collections.emptyList[Row](), typeInfo) - tEnv.createTemporaryView(tableName, ds) - functions.foreach(f => tEnv.registerFunction(f._1, f._2)) + if (containsLegacyTypes) { + val ds = env.fromCollection(Collections.emptyList[Row](), typeInfo) + tEnv.createTemporaryView(tableName, ds) + functions.foreach(f => tEnv.registerFunction(f._1, f._2)) + } else { + tEnv.createTemporaryView(tableName, tEnv.fromValues(resolvedDataType)) + testSystemFunctions.asScala.foreach(e => tEnv.createTemporarySystemFunction(e._1, e._2)) + } // prepare RelBuilder relBuilder.scan(tableName) @@ -100,7 +109,11 @@ abstract class ExpressionTestBase { @After def evaluateExprs(): Unit = { val ctx = CodeGeneratorContext(config) - val inputType = fromTypeInfoToLogicalType(typeInfo) + val inputType = if (containsLegacyTypes) { + fromTypeInfoToLogicalType(typeInfo) + } else { + resolvedDataType.getLogicalType + } val exprGenerator = new ExprCodeGenerator(ctx, nullableInput = false).bindInput(inputType) // cast expressions to String @@ -145,10 +158,18 @@ abstract class ExpressionTestBase { richMapper.open(new Configuration()) } - val converter = DataFormatConverters - .getConverterForDataType(dataType) - .asInstanceOf[DataFormatConverters.DataFormatConverter[RowData, Row]] - val testRow = converter.toInternal(testData) + val testRow = if (containsLegacyTypes) { + val converter = DataFormatConverters + .getConverterForDataType(resolvedDataType) + .asInstanceOf[DataFormatConverter[RowData, Row]] + converter.toInternal(testData) + } else { + val converter = DataStructureConverters + .getConverter(resolvedDataType) + .asInstanceOf[DataStructureConverter[RowData, Row]] + converter.toInternalOrNull(testData) + } + val result = mapper.map(testRow) // call close method for RichFunction @@ -194,7 +215,7 @@ abstract class ExpressionTestBase { val optimized = hep.findBestExp() // throw exception if plan contains more than a calc - if (!optimized.getInput(0).isInstanceOf[LogicalTableScan]) { + if (!optimized.getInput(0).getInputs.isEmpty) { fail("Expression is converted into more than a Calc operation. Use a different test method.") } @@ -210,24 +231,14 @@ abstract class ExpressionTestBase { def testAllApis( expr: Expression, - exprString: String, sqlExpr: String, expected: String): Unit = { addTableApiTestExpr(expr, expected) - addTableApiTestExpr(exprString, expected) addSqlTestExpr(sqlExpr, expected) } def testTableApi( expr: Expression, - exprString: String, - expected: String): Unit = { - addTableApiTestExpr(expr, expected) - addTableApiTestExpr(exprString, expected) - } - - def testTableApi( - expr: Expression, expected: String): Unit = { addTableApiTestExpr(expr, expected) } @@ -252,8 +263,41 @@ abstract class ExpressionTestBase { def testData: Row - def typeInfo: RowTypeInfo + def testDataType: AbstractDataType[_] = + throw new IllegalArgumentException("Implement this if no legacy types are expected.") + + def testSystemFunctions: java.util.Map[String, ScalarFunction] = Collections.emptyMap(); + + // ---------------------------------------------------------------------------------------------- + // Legacy type system + // ---------------------------------------------------------------------------------------------- - def dataType: DataType = TypeConversions.fromLegacyInfoToDataType(typeInfo) + def containsLegacyTypes: Boolean = true + @deprecated + def functions: Map[String, ScalarFunction] = Map() + + @deprecated + def typeInfo: RowTypeInfo = + throw new IllegalArgumentException("Implement this if legacy types are expected.") + + @deprecated + def testAllApis( + expr: Expression, + exprString: String, + sqlExpr: String, + expected: String): Unit = { + addTableApiTestExpr(expr, expected) + addTableApiTestExpr(exprString, expected) + addSqlTestExpr(sqlExpr, expected) + } + + @deprecated + def testTableApi( + expr: Expression, + exprString: String, + expected: String): Unit = { + addTableApiTestExpr(expr, expected) + addTableApiTestExpr(exprString, expected) + } }