Repository: flink Updated Branches: refs/heads/master 910f733f5 -> ecfb5b5f6
[FLINK-4825] [table] Implement a RexExecutor that uses Flink's code generation. This closes #2884 This closes #2874 (closing PR with Public API breaking changes) Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db441dec Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db441dec Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db441dec Branch: refs/heads/master Commit: db441decb41bf856400766023bfc7de77d6041aa Parents: 910f733 Author: twalthr <[email protected]> Authored: Mon Nov 28 12:18:36 2016 +0100 Committer: Fabian Hueske <[email protected]> Committed: Tue Nov 29 13:14:40 2016 +0100 ---------------------------------------------------------------------- .../flink/api/table/FlinkRelBuilder.scala | 1 + .../flink/api/table/TableEnvironment.scala | 5 +- .../flink/api/table/codegen/Compiler.scala | 41 +++++++ .../api/table/codegen/ExpressionReducer.scala | 117 +++++++++++++++++++ .../api/table/plan/rules/FlinkRuleSets.scala | 16 ++- .../flink/api/table/runtime/Compiler.scala | 42 ------- .../api/table/runtime/FlatJoinRunner.scala | 1 + .../flink/api/table/runtime/FlatMapRunner.scala | 1 + .../flink/api/table/runtime/MapRunner.scala | 1 + .../table/runtime/io/ValuesInputFormat.scala | 2 +- .../api/scala/batch/sql/SetOperatorsTest.scala | 2 +- .../api/table/ExpressionReductionTest.scala | 20 ++-- .../table/expressions/ScalarOperatorsTest.scala | 2 +- .../expressions/utils/ExpressionTestBase.scala | 3 +- 14 files changed, 185 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala index ea4eed0..da44ebb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala @@ -80,6 +80,7 @@ object FlinkRelBuilder { // create context instances with Flink type factory val planner = new VolcanoPlanner(Contexts.empty()) + planner.setExecutor(config.getExecutor) planner.addRelTraitDef(ConventionTraitDef.INSTANCE) val cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory)) val calciteSchema = CalciteSchema.from(config.getDefaultSchema) http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala index e8734f5..7b2b738 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala @@ -25,7 +25,7 @@ import org.apache.calcite.config.Lex import org.apache.calcite.plan.RelOptPlanner import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rex.RexExecutorImpl -import org.apache.calcite.schema.{Schemas, SchemaPlus} +import org.apache.calcite.schema.{SchemaPlus, Schemas} import org.apache.calcite.schema.impl.AbstractTable import org.apache.calcite.sql.SqlOperatorTable import org.apache.calcite.sql.parser.SqlParser @@ -38,6 +38,7 @@ import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv} import org.apache.flink.api.scala.table.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv} import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv} +import org.apache.flink.api.table.codegen.ExpressionReducer import org.apache.flink.api.table.expressions.{Alias, Expression, UnresolvedFieldReference} import org.apache.flink.api.table.functions.{ScalarFunction, UserDefinedFunction} import org.apache.flink.api.table.plan.cost.DataSetCostFactory @@ -71,7 +72,7 @@ abstract class TableEnvironment(val config: TableConfig) { .typeSystem(new FlinkTypeSystem) .operatorTable(getSqlOperatorTable) // set the executor to evaluate constant expressions - .executor(new RexExecutorImpl(Schemas.createDataContext(null))) + .executor(new ExpressionReducer(config)) .build // the builder for Calcite RelNodes, Calcite's representation of a relational expression tree. http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Compiler.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Compiler.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Compiler.scala new file mode 100644 index 0000000..fce13ba --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Compiler.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.codegen + +import org.apache.flink.api.common.InvalidProgramException +import org.codehaus.commons.compiler.CompileException +import org.codehaus.janino.SimpleCompiler + +trait Compiler[T] { + + @throws(classOf[CompileException]) + def compile(cl: ClassLoader, name: String, code: String): Class[T] = { + require(cl != null, "Classloader must not be null.") + val compiler = new SimpleCompiler() + compiler.setParentClassLoader(cl) + try { + compiler.cook(code) + } catch { + case e: CompileException => + throw new InvalidProgramException("Table program cannot be compiled. " + + "This is a bug. Please file an issue.", e) + } + compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]] + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala new file mode 100644 index 0000000..74756ef --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.codegen + +import java.util + +import org.apache.calcite.plan.RelOptPlanner +import org.apache.calcite.rex.{RexBuilder, RexNode} +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.table.typeutils.{RowTypeInfo, TypeConverter} +import org.apache.flink.api.table.{FlinkTypeFactory, Row, TableConfig} + +import scala.collection.JavaConverters._ + +/** + * Evaluates constant expressions using Flink's [[CodeGenerator]]. + */ +class ExpressionReducer(config: TableConfig) + extends RelOptPlanner.Executor with Compiler[MapFunction[Row, Row]] { + + private val EMPTY_ROW_INFO = TypeConverter.DEFAULT_ROW_TYPE + private val EMPTY_ROW = new Row(0) + + override def reduce( + rexBuilder: RexBuilder, + constExprs: util.List[RexNode], + reducedValues: util.List[RexNode]): Unit = { + + val typeFactory = rexBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory] + + val literals = constExprs.asScala.map(e => (e.getType.getSqlTypeName, e)).flatMap { + + // we need to cast here for RexBuilder.makeLiteral + case (SqlTypeName.DATE, e) => + Some( + rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.INT_TYPE_INFO), e) + ) + case (SqlTypeName.TIME, e) => + Some( + rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.INT_TYPE_INFO), e) + ) + case (SqlTypeName.TIMESTAMP, e) => + Some( + rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.LONG_TYPE_INFO), e) + ) + + // we don't support object literals yet, we skip those constant expressions + case (SqlTypeName.ANY, _) | (SqlTypeName.ROW, _) => None + + case (_, e) => Some(e) + } + + val literalTypes = literals.map(e => FlinkTypeFactory.toTypeInfo(e.getType)) + val resultType = new RowTypeInfo(literalTypes) + + // generate MapFunction + val generator = new CodeGenerator(config, false, EMPTY_ROW_INFO) + + val result = generator.generateResultExpression( + resultType, + resultType.getFieldNames, + literals) + + val generatedFunction = generator.generateFunction[MapFunction[Row, Row]]( + "ExpressionReducer", + classOf[MapFunction[Row, Row]], + s""" + |${result.code} + |return ${result.resultTerm}; + |""".stripMargin, + resultType.asInstanceOf[TypeInformation[Any]]) + + val clazz = compile(getClass.getClassLoader, generatedFunction.name, generatedFunction.code) + val function = clazz.newInstance() + + // execute + val reduced = function.map(EMPTY_ROW) + + // add the reduced results or keep them unreduced + var i = 0 + var reducedIdx = 0 + while (i < constExprs.size()) { + val unreduced = constExprs.get(i) + unreduced.getType.getSqlTypeName match { + // we insert the original expression for object literals + case SqlTypeName.ANY | SqlTypeName.ROW => + reducedValues.add(unreduced) + case _ => + val literal = rexBuilder.makeLiteral( + reduced.productElement(reducedIdx), + unreduced.getType, + true) + reducedValues.add(literal) + reducedIdx += 1 + } + i += 1 + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala index 638deac..5653083 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala @@ -75,11 +75,10 @@ object FlinkRuleSets { SortRemoveRule.INSTANCE, // simplify expressions rules - // TODO uncomment if FLINK-4825 is solved - // ReduceExpressionsRule.FILTER_INSTANCE, - // ReduceExpressionsRule.PROJECT_INSTANCE, - // ReduceExpressionsRule.CALC_INSTANCE, - // ReduceExpressionsRule.JOIN_INSTANCE, + ReduceExpressionsRule.FILTER_INSTANCE, + ReduceExpressionsRule.PROJECT_INSTANCE, + ReduceExpressionsRule.CALC_INSTANCE, + ReduceExpressionsRule.JOIN_INSTANCE, // prune empty results rules PruneEmptyRules.AGGREGATE_INSTANCE, @@ -137,10 +136,9 @@ object FlinkRuleSets { ProjectRemoveRule.INSTANCE, // simplify expressions rules - // TODO uncomment if FLINK-4825 is solved - // ReduceExpressionsRule.FILTER_INSTANCE, - // ReduceExpressionsRule.PROJECT_INSTANCE, - // ReduceExpressionsRule.CALC_INSTANCE, + ReduceExpressionsRule.FILTER_INSTANCE, + ReduceExpressionsRule.PROJECT_INSTANCE, + ReduceExpressionsRule.CALC_INSTANCE, // merge and push unions rules UnionEliminatorRule.INSTANCE, http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/Compiler.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/Compiler.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/Compiler.scala deleted file mode 100644 index c5d566e..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/Compiler.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.runtime - -import org.apache.flink.api.common.InvalidProgramException -import org.apache.flink.api.common.functions.Function -import org.codehaus.commons.compiler.CompileException -import org.codehaus.janino.SimpleCompiler - -trait Compiler[T] { - - @throws(classOf[CompileException]) - def compile(cl: ClassLoader, name: String, code: String): Class[T] = { - require(cl != null, "Classloader must not be null.") - val compiler = new SimpleCompiler() - compiler.setParentClassLoader(cl) - try { - compiler.cook(code) - } catch { - case e: CompileException => - throw new InvalidProgramException("Table program cannot be compiled. " + - "This is a bug. Please file an issue.", e) - } - compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]] - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala index c6a8fe8..2e57a0f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala @@ -21,6 +21,7 @@ package org.apache.flink.api.table.runtime import org.apache.flink.api.common.functions.{FlatJoinFunction, RichFlatJoinFunction} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.api.table.codegen.Compiler import org.apache.flink.configuration.Configuration import org.apache.flink.util.Collector import org.slf4j.LoggerFactory http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala index 2e942eb..e228e2b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala @@ -21,6 +21,7 @@ package org.apache.flink.api.table.runtime import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.api.table.codegen.Compiler import org.apache.flink.configuration.Configuration import org.apache.flink.util.Collector import org.slf4j.LoggerFactory http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala index 944b415..9fd1876 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala @@ -21,6 +21,7 @@ package org.apache.flink.api.table.runtime import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.api.table.codegen.Compiler import org.apache.flink.configuration.Configuration import org.slf4j.LoggerFactory http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala index 34bff15..2a4be46 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala @@ -21,7 +21,7 @@ package org.apache.flink.api.table.runtime.io import org.apache.flink.api.common.io.{GenericInputFormat, NonParallelInput} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.ResultTypeQueryable -import org.apache.flink.api.table.runtime.Compiler +import org.apache.flink.api.table.codegen.Compiler import org.apache.flink.core.io.GenericInputSplit import org.slf4j.LoggerFactory http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala index 5bc6e4a..7b2b497 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala @@ -54,7 +54,7 @@ class SetOperatorsTest extends TableTestBase { term("join", "b_long", "b_int", "b_string", "a_long"), term("joinType", "InnerJoin") ), - term("select", "a_long", "true AS $f0") + term("select", "true AS $f0", "a_long") ), term("groupBy", "a_long"), term("select", "a_long", "MIN($f0) AS $f1") http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala index 9694687..b8156a2 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala @@ -21,10 +21,8 @@ import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._ import org.apache.flink.api.table.utils.TableTestBase import org.apache.flink.api.table.utils.TableTestUtil._ -import org.junit.{Ignore, Test} +import org.junit.Test -// TODO enable if FLINK-4825 is solved -@Ignore class ExpressionReductionTest extends TableTestBase { @Test @@ -64,7 +62,7 @@ class ExpressionReductionTest extends TableTestBase { "true AS EXPR$9", "2 AS EXPR$10", "true AS EXPR$11", - "'TRUEX' AS EXPR$12" + "'trueX' AS EXPR$12" ), term("where", ">(a, 8)") ) @@ -109,7 +107,7 @@ class ExpressionReductionTest extends TableTestBase { "true AS EXPR$9", "2 AS EXPR$10", "true AS EXPR$11", - "'TRUEX' AS EXPR$12" + "'trueX' AS EXPR$12" ) ) @@ -164,7 +162,7 @@ class ExpressionReductionTest extends TableTestBase { "false AS _c5", "true AS _c6", "2E0 AS _c7", - "'TRUEX' AS _c8" + "'trueX' AS _c8" ), term("where", ">(a, 8)") ) @@ -200,7 +198,7 @@ class ExpressionReductionTest extends TableTestBase { "false AS _c5", "true AS _c6", "2E0 AS _c7", - "'TRUEX' AS _c8" + "'trueX' AS _c8" ) ) @@ -262,7 +260,7 @@ class ExpressionReductionTest extends TableTestBase { "true AS EXPR$9", "2 AS EXPR$10", "true AS EXPR$11", - "'TRUEX' AS EXPR$12" + "'trueX' AS EXPR$12" ), term("where", ">(a, 8)") ) @@ -307,7 +305,7 @@ class ExpressionReductionTest extends TableTestBase { "true AS EXPR$9", "2 AS EXPR$10", "true AS EXPR$11", - "'TRUEX' AS EXPR$12" + "'trueX' AS EXPR$12" ) ) @@ -362,7 +360,7 @@ class ExpressionReductionTest extends TableTestBase { "false AS _c5", "true AS _c6", "2E0 AS _c7", - "'TRUEX' AS _c8" + "'trueX' AS _c8" ), term("where", ">(a, 8)") ) @@ -398,7 +396,7 @@ class ExpressionReductionTest extends TableTestBase { "false AS _c5", "true AS _c6", "2E0 AS _c7", - "'TRUEX' AS _c8" + "'trueX' AS _c8" ) ) http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala index 1f5a069..7ad2212 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala @@ -129,7 +129,7 @@ class ScalarOperatorsTest extends ExpressionTestBase { testSqlApi( "CASE 1 WHEN 1, 2 THEN '1 or 2' WHEN 2 THEN 'not possible' WHEN 3, 2 THEN '3' " + "ELSE 'none of the above' END", - "1 or 2") + "1 or 2 ") testSqlApi("CASE WHEN 'a'='a' THEN 1 END", "1") testSqlApi("CASE 2 WHEN 1 THEN 'a' WHEN 2 THEN 'bcd' END", "bcd") testSqlApi("CASE f2 WHEN 1 THEN 11 WHEN 2 THEN 4 ELSE NULL END", "11") http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala index d34e335..84b61da 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala @@ -28,12 +28,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.{DataSet => JDataSet} import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} import org.apache.flink.api.table._ -import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedFunction} +import org.apache.flink.api.table.codegen.{CodeGenerator, Compiler, GeneratedFunction} import org.apache.flink.api.table.expressions.{Expression, ExpressionParser} import org.apache.flink.api.table.functions.UserDefinedFunction import org.apache.flink.api.table.plan.nodes.dataset.{DataSetCalc, DataSetConvention} import org.apache.flink.api.table.plan.rules.FlinkRuleSets -import org.apache.flink.api.table.runtime.Compiler import org.apache.flink.api.table.typeutils.RowTypeInfo import org.junit.Assert._ import org.junit.{After, Before}
