Repository: flink
Updated Branches:
  refs/heads/master 910f733f5 -> ecfb5b5f6


[FLINK-4825] [table] Implement a RexExecutor that uses Flink's code generation.

This closes #2884
This closes #2874 (closing PR with Public API breaking changes)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db441dec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db441dec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db441dec

Branch: refs/heads/master
Commit: db441decb41bf856400766023bfc7de77d6041aa
Parents: 910f733
Author: twalthr <[email protected]>
Authored: Mon Nov 28 12:18:36 2016 +0100
Committer: Fabian Hueske <[email protected]>
Committed: Tue Nov 29 13:14:40 2016 +0100

----------------------------------------------------------------------
 .../flink/api/table/FlinkRelBuilder.scala       |   1 +
 .../flink/api/table/TableEnvironment.scala      |   5 +-
 .../flink/api/table/codegen/Compiler.scala      |  41 +++++++
 .../api/table/codegen/ExpressionReducer.scala   | 117 +++++++++++++++++++
 .../api/table/plan/rules/FlinkRuleSets.scala    |  16 ++-
 .../flink/api/table/runtime/Compiler.scala      |  42 -------
 .../api/table/runtime/FlatJoinRunner.scala      |   1 +
 .../flink/api/table/runtime/FlatMapRunner.scala |   1 +
 .../flink/api/table/runtime/MapRunner.scala     |   1 +
 .../table/runtime/io/ValuesInputFormat.scala    |   2 +-
 .../api/scala/batch/sql/SetOperatorsTest.scala  |   2 +-
 .../api/table/ExpressionReductionTest.scala     |  20 ++--
 .../table/expressions/ScalarOperatorsTest.scala |   2 +-
 .../expressions/utils/ExpressionTestBase.scala  |   3 +-
 14 files changed, 185 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
index ea4eed0..da44ebb 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
@@ -80,6 +80,7 @@ object FlinkRelBuilder {
 
     // create context instances with Flink type factory
     val planner = new VolcanoPlanner(Contexts.empty())
+    planner.setExecutor(config.getExecutor)
     planner.addRelTraitDef(ConventionTraitDef.INSTANCE)
     val cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory))
     val calciteSchema = CalciteSchema.from(config.getDefaultSchema)

http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
index e8734f5..7b2b738 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
@@ -25,7 +25,7 @@ import org.apache.calcite.config.Lex
 import org.apache.calcite.plan.RelOptPlanner
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex.RexExecutorImpl
-import org.apache.calcite.schema.{Schemas, SchemaPlus}
+import org.apache.calcite.schema.{SchemaPlus, Schemas}
 import org.apache.calcite.schema.impl.AbstractTable
 import org.apache.calcite.sql.SqlOperatorTable
 import org.apache.calcite.sql.parser.SqlParser
@@ -38,6 +38,7 @@ import org.apache.flink.api.java.{ExecutionEnvironment => 
JavaBatchExecEnv}
 import org.apache.flink.api.scala.table.{BatchTableEnvironment => 
ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv}
+import org.apache.flink.api.table.codegen.ExpressionReducer
 import org.apache.flink.api.table.expressions.{Alias, Expression, 
UnresolvedFieldReference}
 import org.apache.flink.api.table.functions.{ScalarFunction, 
UserDefinedFunction}
 import org.apache.flink.api.table.plan.cost.DataSetCostFactory
@@ -71,7 +72,7 @@ abstract class TableEnvironment(val config: TableConfig) {
     .typeSystem(new FlinkTypeSystem)
     .operatorTable(getSqlOperatorTable)
     // set the executor to evaluate constant expressions
-    .executor(new RexExecutorImpl(Schemas.createDataContext(null)))
+    .executor(new ExpressionReducer(config))
     .build
 
   // the builder for Calcite RelNodes, Calcite's representation of a 
relational expression tree.

http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Compiler.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Compiler.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Compiler.scala
new file mode 100644
index 0000000..fce13ba
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Compiler.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.codegen
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.codehaus.commons.compiler.CompileException
+import org.codehaus.janino.SimpleCompiler
+
+trait Compiler[T] {
+
+  @throws(classOf[CompileException])
+  def compile(cl: ClassLoader, name: String, code: String): Class[T] = {
+    require(cl != null, "Classloader must not be null.")
+    val compiler = new SimpleCompiler()
+    compiler.setParentClassLoader(cl)
+    try {
+      compiler.cook(code)
+    } catch {
+      case e: CompileException =>
+        throw new InvalidProgramException("Table program cannot be compiled. " 
+
+          "This is a bug. Please file an issue.", e)
+    }
+    compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]]
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala
new file mode 100644
index 0000000..74756ef
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.codegen
+
+import java.util
+
+import org.apache.calcite.plan.RelOptPlanner
+import org.apache.calcite.rex.{RexBuilder, RexNode}
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.table.typeutils.{RowTypeInfo, TypeConverter}
+import org.apache.flink.api.table.{FlinkTypeFactory, Row, TableConfig}
+
+import scala.collection.JavaConverters._
+
+/**
+  * Evaluates constant expressions using Flink's [[CodeGenerator]].
+  */
+class ExpressionReducer(config: TableConfig)
+  extends RelOptPlanner.Executor with Compiler[MapFunction[Row, Row]] {
+
+  private val EMPTY_ROW_INFO = TypeConverter.DEFAULT_ROW_TYPE
+  private val EMPTY_ROW = new Row(0)
+
+  override def reduce(
+    rexBuilder: RexBuilder,
+    constExprs: util.List[RexNode],
+    reducedValues: util.List[RexNode]): Unit = {
+
+    val typeFactory = rexBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+
+    val literals = constExprs.asScala.map(e => (e.getType.getSqlTypeName, 
e)).flatMap {
+
+      // we need to cast here for RexBuilder.makeLiteral
+      case (SqlTypeName.DATE, e) =>
+        Some(
+          
rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.INT_TYPE_INFO),
 e)
+        )
+      case (SqlTypeName.TIME, e) =>
+        Some(
+          
rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.INT_TYPE_INFO),
 e)
+        )
+      case (SqlTypeName.TIMESTAMP, e) =>
+        Some(
+          
rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.LONG_TYPE_INFO),
 e)
+        )
+
+      // we don't support object literals yet, we skip those constant 
expressions
+      case (SqlTypeName.ANY, _) | (SqlTypeName.ROW, _) => None
+
+      case (_, e) => Some(e)
+    }
+
+    val literalTypes = literals.map(e => 
FlinkTypeFactory.toTypeInfo(e.getType))
+    val resultType = new RowTypeInfo(literalTypes)
+
+    // generate MapFunction
+    val generator = new CodeGenerator(config, false, EMPTY_ROW_INFO)
+
+    val result = generator.generateResultExpression(
+      resultType,
+      resultType.getFieldNames,
+      literals)
+
+    val generatedFunction = generator.generateFunction[MapFunction[Row, Row]](
+      "ExpressionReducer",
+      classOf[MapFunction[Row, Row]],
+      s"""
+        |${result.code}
+        |return ${result.resultTerm};
+        |""".stripMargin,
+      resultType.asInstanceOf[TypeInformation[Any]])
+
+    val clazz = compile(getClass.getClassLoader, generatedFunction.name, 
generatedFunction.code)
+    val function = clazz.newInstance()
+
+    // execute
+    val reduced = function.map(EMPTY_ROW)
+
+    // add the reduced results or keep them unreduced
+    var i = 0
+    var reducedIdx = 0
+    while (i < constExprs.size()) {
+      val unreduced = constExprs.get(i)
+      unreduced.getType.getSqlTypeName match {
+        // we insert the original expression for object literals
+        case SqlTypeName.ANY | SqlTypeName.ROW =>
+          reducedValues.add(unreduced)
+        case _ =>
+          val literal = rexBuilder.makeLiteral(
+            reduced.productElement(reducedIdx),
+            unreduced.getType,
+            true)
+          reducedValues.add(literal)
+          reducedIdx += 1
+      }
+      i += 1
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
index 638deac..5653083 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -75,11 +75,10 @@ object FlinkRuleSets {
     SortRemoveRule.INSTANCE,
 
     // simplify expressions rules
-    // TODO uncomment if FLINK-4825 is solved
-    // ReduceExpressionsRule.FILTER_INSTANCE,
-    // ReduceExpressionsRule.PROJECT_INSTANCE,
-    // ReduceExpressionsRule.CALC_INSTANCE,
-    // ReduceExpressionsRule.JOIN_INSTANCE,
+    ReduceExpressionsRule.FILTER_INSTANCE,
+    ReduceExpressionsRule.PROJECT_INSTANCE,
+    ReduceExpressionsRule.CALC_INSTANCE,
+    ReduceExpressionsRule.JOIN_INSTANCE,
 
     // prune empty results rules
     PruneEmptyRules.AGGREGATE_INSTANCE,
@@ -137,10 +136,9 @@ object FlinkRuleSets {
       ProjectRemoveRule.INSTANCE,
 
       // simplify expressions rules
-      // TODO uncomment if FLINK-4825 is solved
-      // ReduceExpressionsRule.FILTER_INSTANCE,
-      // ReduceExpressionsRule.PROJECT_INSTANCE,
-      // ReduceExpressionsRule.CALC_INSTANCE,
+      ReduceExpressionsRule.FILTER_INSTANCE,
+      ReduceExpressionsRule.PROJECT_INSTANCE,
+      ReduceExpressionsRule.CALC_INSTANCE,
 
       // merge and push unions rules
       UnionEliminatorRule.INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/Compiler.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/Compiler.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/Compiler.scala
deleted file mode 100644
index c5d566e..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/Compiler.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime
-
-import org.apache.flink.api.common.InvalidProgramException
-import org.apache.flink.api.common.functions.Function
-import org.codehaus.commons.compiler.CompileException
-import org.codehaus.janino.SimpleCompiler
-
-trait Compiler[T] {
-
-  @throws(classOf[CompileException])
-  def compile(cl: ClassLoader, name: String, code: String): Class[T] = {
-    require(cl != null, "Classloader must not be null.")
-    val compiler = new SimpleCompiler()
-    compiler.setParentClassLoader(cl)
-    try {
-      compiler.cook(code)
-    } catch {
-      case e: CompileException =>
-        throw new InvalidProgramException("Table program cannot be compiled. " 
+
-          "This is a bug. Please file an issue.", e)
-    }
-    compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]]
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala
index c6a8fe8..2e57a0f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatJoinRunner.scala
@@ -21,6 +21,7 @@ package org.apache.flink.api.table.runtime
 import org.apache.flink.api.common.functions.{FlatJoinFunction, 
RichFlatJoinFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.api.table.codegen.Compiler
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.util.Collector
 import org.slf4j.LoggerFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala
index 2e942eb..e228e2b 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala
@@ -21,6 +21,7 @@ package org.apache.flink.api.table.runtime
 import org.apache.flink.api.common.functions.{FlatMapFunction, 
RichFlatMapFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.api.table.codegen.Compiler
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.util.Collector
 import org.slf4j.LoggerFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala
index 944b415..9fd1876 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala
@@ -21,6 +21,7 @@ package org.apache.flink.api.table.runtime
 import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.api.table.codegen.Compiler
 import org.apache.flink.configuration.Configuration
 import org.slf4j.LoggerFactory
 

http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala
index 34bff15..2a4be46 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.table.runtime.io
 import org.apache.flink.api.common.io.{GenericInputFormat, NonParallelInput}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.api.table.runtime.Compiler
+import org.apache.flink.api.table.codegen.Compiler
 import org.apache.flink.core.io.GenericInputSplit
 import org.slf4j.LoggerFactory
 

http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
index 5bc6e4a..7b2b497 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
@@ -54,7 +54,7 @@ class SetOperatorsTest extends TableTestBase {
               term("join", "b_long", "b_int", "b_string", "a_long"),
               term("joinType", "InnerJoin")
             ),
-            term("select", "a_long", "true AS $f0")
+            term("select", "true AS $f0", "a_long")
           ),
           term("groupBy", "a_long"),
           term("select", "a_long", "MIN($f0) AS $f1")

http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
index 9694687..b8156a2 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
@@ -21,10 +21,8 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.table.utils.TableTestBase
 import org.apache.flink.api.table.utils.TableTestUtil._
-import org.junit.{Ignore, Test}
+import org.junit.Test
 
-// TODO enable if FLINK-4825 is solved
-@Ignore
 class ExpressionReductionTest extends TableTestBase {
 
   @Test
@@ -64,7 +62,7 @@ class ExpressionReductionTest extends TableTestBase {
         "true AS EXPR$9",
         "2 AS EXPR$10",
         "true AS EXPR$11",
-        "'TRUEX' AS EXPR$12"
+        "'trueX' AS EXPR$12"
       ),
       term("where", ">(a, 8)")
     )
@@ -109,7 +107,7 @@ class ExpressionReductionTest extends TableTestBase {
         "true AS EXPR$9",
         "2 AS EXPR$10",
         "true AS EXPR$11",
-        "'TRUEX' AS EXPR$12"
+        "'trueX' AS EXPR$12"
       )
     )
 
@@ -164,7 +162,7 @@ class ExpressionReductionTest extends TableTestBase {
         "false AS _c5",
         "true AS _c6",
         "2E0 AS _c7",
-        "'TRUEX' AS _c8"
+        "'trueX' AS _c8"
       ),
       term("where", ">(a, 8)")
     )
@@ -200,7 +198,7 @@ class ExpressionReductionTest extends TableTestBase {
         "false AS _c5",
         "true AS _c6",
         "2E0 AS _c7",
-        "'TRUEX' AS _c8"
+        "'trueX' AS _c8"
       )
     )
 
@@ -262,7 +260,7 @@ class ExpressionReductionTest extends TableTestBase {
         "true AS EXPR$9",
         "2 AS EXPR$10",
         "true AS EXPR$11",
-        "'TRUEX' AS EXPR$12"
+        "'trueX' AS EXPR$12"
       ),
       term("where", ">(a, 8)")
     )
@@ -307,7 +305,7 @@ class ExpressionReductionTest extends TableTestBase {
         "true AS EXPR$9",
         "2 AS EXPR$10",
         "true AS EXPR$11",
-        "'TRUEX' AS EXPR$12"
+        "'trueX' AS EXPR$12"
       )
     )
 
@@ -362,7 +360,7 @@ class ExpressionReductionTest extends TableTestBase {
         "false AS _c5",
         "true AS _c6",
         "2E0 AS _c7",
-        "'TRUEX' AS _c8"
+        "'trueX' AS _c8"
       ),
       term("where", ">(a, 8)")
     )
@@ -398,7 +396,7 @@ class ExpressionReductionTest extends TableTestBase {
         "false AS _c5",
         "true AS _c6",
         "2E0 AS _c7",
-        "'TRUEX' AS _c8"
+        "'trueX' AS _c8"
       )
     )
 

http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala
index 1f5a069..7ad2212 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala
@@ -129,7 +129,7 @@ class ScalarOperatorsTest extends ExpressionTestBase {
     testSqlApi(
       "CASE 1 WHEN 1, 2 THEN '1 or 2' WHEN 2 THEN 'not possible' WHEN 3, 2 
THEN '3' " +
       "ELSE 'none of the above' END",
-      "1 or 2")
+      "1 or 2           ")
     testSqlApi("CASE WHEN 'a'='a' THEN 1 END", "1")
     testSqlApi("CASE 2 WHEN 1 THEN 'a' WHEN 2 THEN 'bcd' END", "bcd")
     testSqlApi("CASE f2 WHEN 1 THEN 11 WHEN 2 THEN 4 ELSE NULL END", "11")

http://git-wip-us.apache.org/repos/asf/flink/blob/db441dec/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
index d34e335..84b61da 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
@@ -28,12 +28,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.{DataSet => JDataSet}
 import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
 import org.apache.flink.api.table._
-import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedFunction}
+import org.apache.flink.api.table.codegen.{CodeGenerator, Compiler, 
GeneratedFunction}
 import org.apache.flink.api.table.expressions.{Expression, ExpressionParser}
 import org.apache.flink.api.table.functions.UserDefinedFunction
 import org.apache.flink.api.table.plan.nodes.dataset.{DataSetCalc, 
DataSetConvention}
 import org.apache.flink.api.table.plan.rules.FlinkRuleSets
-import org.apache.flink.api.table.runtime.Compiler
 import org.apache.flink.api.table.typeutils.RowTypeInfo
 import org.junit.Assert._
 import org.junit.{After, Before}

Reply via email to