Repository: flink Updated Branches: refs/heads/master 3d5bca0ab -> c7d1a3b8d
[FLINK-4791] [table] Fix issues caused by expression reduction Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c7d1a3b8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c7d1a3b8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c7d1a3b8 Branch: refs/heads/master Commit: c7d1a3b8dfe8d31c77e2ba0afb8bf71674134427 Parents: 3d5bca0 Author: twalthr <[email protected]> Authored: Thu Oct 13 16:19:31 2016 +0200 Committer: twalthr <[email protected]> Committed: Thu Oct 13 16:26:17 2016 +0200 ---------------------------------------------------------------------- docs/dev/table_api.md | 44 ++++++++++++++++++++ .../flink/api/scala/table/expressionDsl.scala | 10 +++++ .../flink/api/table/FlinkTypeSystem.scala | 6 +++ .../api/table/expressions/comparison.scala | 20 +++++++++ .../flink/api/table/expressions/time.scala | 4 +- .../table/plan/nodes/dataset/DataSetCalc.scala | 2 +- .../api/table/plan/rules/FlinkRuleSets.scala | 16 +++---- .../api/table/validate/FunctionCatalog.scala | 2 + .../api/table/ExpressionReductionTest.scala | 5 ++- .../table/expressions/ScalarFunctionsTest.scala | 39 ++++++++++------- .../expressions/utils/ExpressionTestBase.scala | 43 ++++++++++++++----- 11 files changed, 155 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c7d1a3b8/docs/dev/table_api.md ---------------------------------------------------------------------- diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md index 2d6d6ce..91ccd8a 100644 --- a/docs/dev/table_api.md +++ b/docs/dev/table_api.md @@ -1278,6 +1278,28 @@ BOOLEAN.isFalse <tr> <td> {% highlight java %} +BOOLEAN.isNotTrue +{% endhighlight %} + </td> + <td> + <p>Returns true if the given boolean expression is not true (for null and false). False otherwise.</p> + </td> + </tr> + + <tr> + <td> + {% highlight java %} +BOOLEAN.isNotFalse +{% endhighlight %} + </td> + <td> + <p>Returns true if given boolean expression is not false (for null and true). False otherwise.</p> + </td> + </tr> + + <tr> + <td> + {% highlight java %} NUMERIC.exp() {% endhighlight %} </td> @@ -1697,6 +1719,28 @@ BOOLEAN.isFalse <tr> <td> {% highlight scala %} +BOOLEAN.isNotTrue +{% endhighlight %} + </td> + <td> + <p>Returns true if the given boolean expression is not true (for null and false). False otherwise.</p> + </td> + </tr> + + <tr> + <td> + {% highlight scala %} +BOOLEAN.isNotFalse +{% endhighlight %} + </td> + <td> + <p>Returns true if given boolean expression is not false (for null and true). False otherwise.</p> + </td> + </tr> + + <tr> + <td> + {% highlight scala %} NUMERIC.exp() {% endhighlight %} </td> http://git-wip-us.apache.org/repos/asf/flink/blob/c7d1a3b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala index 9c2721b..77e5b44 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala @@ -73,6 +73,16 @@ trait ImplicitExpressionOperations { */ def isFalse = IsFalse(expr) + /** + * Returns true if given boolean expression is not true (for null and false). False otherwise. + */ + def isNotTrue = IsNotTrue(expr) + + /** + * Returns true if given boolean expression is not false (for null and true). False otherwise. + */ + def isNotFalse = IsNotFalse(expr) + def + (other: Expression) = Plus(expr, other) def - (other: Expression) = Minus(expr, other) def / (other: Expression) = Div(expr, other) http://git-wip-us.apache.org/repos/asf/flink/blob/c7d1a3b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala index 2df043f..3222eee 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala @@ -35,9 +35,15 @@ class FlinkTypeSystem extends RelDataTypeSystemImpl { override def getMaxNumericPrecision: Int = Int.MaxValue / 2 override def getDefaultPrecision(typeName: SqlTypeName): Int = typeName match { + // by default all VARCHARs can have the Java default length case SqlTypeName.VARCHAR => Int.MaxValue + + // we currenty support only timestamps with milliseconds precision + case SqlTypeName.TIMESTAMP => + 3 + case _ => super.getDefaultPrecision(typeName) } http://git-wip-us.apache.org/repos/asf/flink/blob/c7d1a3b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala index ad01674..babb677 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala @@ -141,3 +141,23 @@ case class IsFalse(child: Expression) extends UnaryExpression { override private[flink] def resultType = BOOLEAN_TYPE_INFO } + +case class IsNotTrue(child: Expression) extends UnaryExpression { + override def toString = s"($child).isNotTrue" + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + relBuilder.call(SqlStdOperatorTable.IS_NOT_TRUE, child.toRexNode) + } + + override private[flink] def resultType = BOOLEAN_TYPE_INFO +} + +case class IsNotFalse(child: Expression) extends UnaryExpression { + override def toString = s"($child).isNotFalse" + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + relBuilder.call(SqlStdOperatorTable.IS_NOT_FALSE, child.toRexNode) + } + + override private[flink] def resultType = BOOLEAN_TYPE_INFO +} http://git-wip-us.apache.org/repos/asf/flink/blob/c7d1a3b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala index 1f6361e..488fd33 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala @@ -355,13 +355,13 @@ case class TemporalOverlaps( : RexNode = { // leftT = leftP + leftT if leftT is an interval val convLeftT = if (isTimeInterval(leftTemporal.resultType)) { - relBuilder.call(SqlStdOperatorTable.PLUS, leftP, leftT) + relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, leftP, leftT) } else { leftT } // rightT = rightP + rightT if rightT is an interval val convRightT = if (isTimeInterval(rightTemporal.resultType)) { - relBuilder.call(SqlStdOperatorTable.PLUS, rightP, rightT) + relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, rightP, rightT) } else { rightT } http://git-wip-us.apache.org/repos/asf/flink/blob/c7d1a3b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala index 6d10089..b8b74ad 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala @@ -41,7 +41,7 @@ class DataSetCalc( traitSet: RelTraitSet, input: RelNode, rowRelDataType: RelDataType, - calcProgram: RexProgram, + private[flink] val calcProgram: RexProgram, // for tests ruleDescription: String) extends SingleRel(cluster, traitSet, input) with FlinkCalc http://git-wip-us.apache.org/repos/asf/flink/blob/c7d1a3b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala index 03cb68c..3ed4385 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala @@ -75,10 +75,11 @@ object FlinkRuleSets { SortRemoveRule.INSTANCE, // simplify expressions rules - ReduceExpressionsRule.FILTER_INSTANCE, - ReduceExpressionsRule.PROJECT_INSTANCE, - ReduceExpressionsRule.CALC_INSTANCE, - ReduceExpressionsRule.JOIN_INSTANCE, + // TODO uncomment if FLINK-4825 is solved + // ReduceExpressionsRule.FILTER_INSTANCE, + // ReduceExpressionsRule.PROJECT_INSTANCE, + // ReduceExpressionsRule.CALC_INSTANCE, + // ReduceExpressionsRule.JOIN_INSTANCE, // prune empty results rules PruneEmptyRules.AGGREGATE_INSTANCE, @@ -136,9 +137,10 @@ object FlinkRuleSets { ProjectRemoveRule.INSTANCE, // simplify expressions rules - ReduceExpressionsRule.FILTER_INSTANCE, - ReduceExpressionsRule.PROJECT_INSTANCE, - ReduceExpressionsRule.CALC_INSTANCE, + // TODO uncomment if FLINK-4825 is solved + // ReduceExpressionsRule.FILTER_INSTANCE, + // ReduceExpressionsRule.PROJECT_INSTANCE, + // ReduceExpressionsRule.CALC_INSTANCE, // merge and push unions rules UnionEliminatorRule.INSTANCE, http://git-wip-us.apache.org/repos/asf/flink/blob/c7d1a3b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala index 9c66730..68e2f97 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala @@ -123,6 +123,8 @@ object FunctionCatalog { "isNotNull" -> classOf[IsNotNull], "isTrue" -> classOf[IsTrue], "isFalse" -> classOf[IsFalse], + "isNotTrue" -> classOf[IsNotTrue], + "isNotFalse" -> classOf[IsNotFalse], // aggregate functions "avg" -> classOf[Avg], http://git-wip-us.apache.org/repos/asf/flink/blob/c7d1a3b8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala index 2d4694e..2f98b80 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala @@ -21,9 +21,10 @@ import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._ import org.apache.flink.api.table.utils.TableTestBase import org.apache.flink.api.table.utils.TableTestUtil._ -import org.junit.Test - +import org.junit.{Ignore, Test} +// TODO enable if FLINK-4825 is solved +@Ignore class ExpressionReductionTest extends TableTestBase { @Test http://git-wip-us.apache.org/repos/asf/flink/blob/c7d1a3b8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala index beead51..c930454 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala @@ -845,13 +845,24 @@ class ScalarFunctionsTest extends ExpressionTestBase { "true") testAllApis( - temporalOverlaps("2011-03-10 02:02:02.001".toTimestamp, 0.milli, - "2011-03-10 02:02:02.002".toTimestamp, "2011-03-10 02:02:02.002".toTimestamp), - "temporalOverlaps('2011-03-10 02:02:02.001'.toTimestamp, 0.milli, " + - "'2011-03-10 02:02:02.002'.toTimestamp, '2011-03-10 02:02:02.002'.toTimestamp)", - "(TIMESTAMP '2011-03-10 02:02:02.001', INTERVAL '0' SECOND) OVERLAPS " + - "(TIMESTAMP '2011-03-10 02:02:02.002', TIMESTAMP '2011-03-10 02:02:02.002')", + temporalOverlaps("2011-03-10 05:02:02".toTimestamp, 0.milli, + "2011-03-10 05:02:02".toTimestamp, "2011-03-10 05:02:01".toTimestamp), + "temporalOverlaps('2011-03-10 05:02:02'.toTimestamp, 0.milli, " + + "'2011-03-10 05:02:02'.toTimestamp, '2011-03-10 05:02:01'.toTimestamp)", + "(TIMESTAMP '2011-03-10 05:02:02', INTERVAL '0' SECOND) OVERLAPS " + + "(TIMESTAMP '2011-03-10 05:02:02', TIMESTAMP '2011-03-10 05:02:01')", "false") + + // TODO enable once CALCITE-1435 is fixed + // comparison of timestamps based on milliseconds is buggy + //testAllApis( + // temporalOverlaps("2011-03-10 02:02:02.001".toTimestamp, 0.milli, + // "2011-03-10 02:02:02.002".toTimestamp, "2011-03-10 02:02:02.002".toTimestamp), + // "temporalOverlaps('2011-03-10 02:02:02.001'.toTimestamp, 0.milli, " + + // "'2011-03-10 02:02:02.002'.toTimestamp, '2011-03-10 02:02:02.002'.toTimestamp)", + // "(TIMESTAMP '2011-03-10 02:02:02.001', INTERVAL '0' SECOND) OVERLAPS " + + // "(TIMESTAMP '2011-03-10 02:02:02.002', TIMESTAMP '2011-03-10 02:02:02.002')", + // "false") } @Test @@ -906,26 +917,26 @@ class ScalarFunctionsTest extends ExpressionTestBase { "false") testAllApis( - !'f1.isTrue, - "!f1.isTrue", + 'f1.isNotTrue, + "f1.isNotTrue", "f1 IS NOT TRUE", "false") testAllApis( - !'f21.isTrue, - "!f21.isTrue", + 'f21.isNotTrue, + "f21.isNotTrue", "f21 IS NOT TRUE", "true") testAllApis( - !false.isFalse, - "!false.isFalse", + false.isNotFalse, + "false.isNotFalse", "FALSE IS NOT FALSE", "false") testAllApis( - !'f21.isFalse, - "!f21.isFalse", + 'f21.isNotFalse, + "f21.isNotFalse", "f21 IS NOT FALSE", "true") } http://git-wip-us.apache.org/repos/asf/flink/blob/c7d1a3b8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala index 605fdd0..ee67ffb 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala @@ -21,7 +21,8 @@ package org.apache.flink.api.table.expressions.utils import org.apache.calcite.rel.logical.LogicalProject import org.apache.calcite.rex.RexNode import org.apache.calcite.sql.`type`.SqlTypeName._ -import org.apache.calcite.tools.RelBuilder +import org.apache.calcite.sql2rel.RelDecorrelator +import org.apache.calcite.tools.{Programs, RelBuilder} import org.apache.flink.api.common.functions.{Function, MapFunction} import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.TypeInformation @@ -31,6 +32,8 @@ import org.apache.flink.api.table._ import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedFunction} import org.apache.flink.api.table.expressions.{Expression, ExpressionParser} import org.apache.flink.api.table.functions.UserDefinedFunction +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetCalc, DataSetConvention} +import org.apache.flink.api.table.plan.rules.FlinkRuleSets import org.apache.flink.api.table.runtime.FunctionCompiler import org.apache.flink.api.table.typeutils.RowTypeInfo import org.junit.Assert._ @@ -44,7 +47,7 @@ import scala.collection.mutable */ abstract class ExpressionTestBase { - private val testExprs = mutable.LinkedHashSet[(RexNode, String)]() + private val testExprs = mutable.ArrayBuffer[(RexNode, String)]() // setup test utils private val tableName = "testTable" @@ -53,6 +56,7 @@ abstract class ExpressionTestBase { context._2.getFrameworkConfig, context._2.getPlanner, context._2.getTypeFactory) + private val optProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES) private def prepareContext(typeInfo: TypeInformation[Any]): (RelBuilder, TableEnvironment) = { // create DataSetTable @@ -134,24 +138,43 @@ abstract class ExpressionTestBase { // create RelNode from SQL expression val parsed = planner.parse(s"SELECT $sqlExpr FROM $tableName") val validated = planner.validate(parsed) - val converted = planner.rel(validated) + val converted = planner.rel(validated).rel + + // create DataSetCalc + val decorPlan = RelDecorrelator.decorrelateQuery(converted) + val flinkOutputProps = converted.getTraitSet.replace(DataSetConvention.INSTANCE).simplify() + val dataSetCalc = optProgram.run(context._2.getPlanner, decorPlan, flinkOutputProps) // extract RexNode - val expr: RexNode = converted.rel.asInstanceOf[LogicalProject].getChildExps.get(0) - testExprs.add((expr, expected)) + val calcProgram = dataSetCalc + .asInstanceOf[DataSetCalc] + .calcProgram + val expanded = calcProgram.expandLocalRef(calcProgram.getProjectList.get(0)) + + testExprs += ((expanded, expected)) } private def addTableApiTestExpr(tableApiExpr: Expression, expected: String): Unit = { + // create RelNode from Table API expression val env = context._2 - val expr = env + val converted = env .asInstanceOf[BatchTableEnvironment] .scan(tableName) .select(tableApiExpr) .getRelNode - .asInstanceOf[LogicalProject] - .getChildExps - .get(0) - testExprs.add((expr, expected)) + + // create DataSetCalc + val decorPlan = RelDecorrelator.decorrelateQuery(converted) + val flinkOutputProps = converted.getTraitSet.replace(DataSetConvention.INSTANCE).simplify() + val dataSetCalc = optProgram.run(context._2.getPlanner, decorPlan, flinkOutputProps) + + // extract RexNode + val calcProgram = dataSetCalc + .asInstanceOf[DataSetCalc] + .calcProgram + val expanded = calcProgram.expandLocalRef(calcProgram.getProjectList.get(0)) + + testExprs += ((expanded, expected)) } private def addTableApiTestExpr(tableApiString: String, expected: String): Unit = {
