Repository: flink Updated Branches: refs/heads/master 59df4b75f -> b6a2dc345
[FLINK-7942] [table] Reduce aliasing in RexNodes This closes #5019. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b6a2dc34 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b6a2dc34 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b6a2dc34 Branch: refs/heads/master Commit: b6a2dc345f37c4b643789e98d02e23a022d31415 Parents: 59df4b7 Author: twalthr <twal...@apache.org> Authored: Wed Nov 15 12:07:16 2017 +0100 Committer: twalthr <twal...@apache.org> Committed: Wed Nov 15 18:26:50 2017 +0100 ---------------------------------------------------------------------- .../org/apache/flink/table/api/table.scala | 12 ++++- .../flink/table/plan/logical/operators.scala | 26 +++++++++-- .../flink/table/api/batch/table/JoinTest.scala | 47 ++++++++++++++++++++ .../flink/table/plan/RetractionRulesTest.scala | 40 ++++++----------- 4 files changed, 93 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b6a2dc34/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala index 7349a0e..071cc69 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala @@ -1106,7 +1106,13 @@ class OverWindowedTable( new Table( table.tableEnv, - Project(expandedOverFields.map(UnresolvedAlias), table.logicalPlan).validate(table.tableEnv)) + Project( + expandedOverFields.map(UnresolvedAlias), + table.logicalPlan, + // required for proper projection push down + explicitAlias = true) + .validate(table.tableEnv) + ) } def select(fields: String): Table = { @@ -1150,7 +1156,9 @@ class WindowGroupedTable( propNames.map(a => Alias(a._1, a._2)).toSeq, aggNames.map(a => Alias(a._1, a._2)).toSeq, Project(projectFields, table.logicalPlan).validate(table.tableEnv) - ).validate(table.tableEnv) + ).validate(table.tableEnv), + // required for proper resolution of the time attribute in multi-windows + explicitAlias = true ).validate(table.tableEnv)) } http://git-wip-us.apache.org/repos/asf/flink/blob/b6a2dc34/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala index fe2bfe5..a2bd1e4 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala @@ -42,7 +42,12 @@ import org.apache.flink.table.validate.{ValidationFailure, ValidationSuccess} import scala.collection.JavaConverters._ import scala.collection.mutable -case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extends UnaryNode { +case class Project( + projectList: Seq[NamedExpression], + child: LogicalNode, + explicitAlias: Boolean = false) + extends UnaryNode { + override def output: Seq[Attribute] = projectList.map(_.toAttribute) override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = { @@ -61,7 +66,7 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend throw new RuntimeException("This should never be called and probably points to a bug.") } } - Project(newProjectList, child) + Project(newProjectList, child, explicitAlias) } override def validate(tableEnv: TableEnvironment): LogicalNode = { @@ -90,8 +95,19 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = { child.construct(relBuilder) + + val exprs = if (explicitAlias) { + projectList + } else { + // remove AS expressions, according to Calcite they should not be in a final RexNode + projectList.map { + case Alias(e: Expression, _, _) => e + case e: Expression => e + } + } + relBuilder.project( - projectList.map(_.toRexNode(relBuilder)).asJava, + exprs.map(_.toRexNode(relBuilder)).asJava, projectList.map(_.name).asJava, true) } @@ -116,7 +132,9 @@ case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) extends Una val input = child.output Project( names.zip(input).map { case (name, attr) => - Alias(attr, name)} ++ input.drop(names.length), child) + Alias(attr, name)} ++ input.drop(names.length), + child, + explicitAlias = true) } } } http://git-wip-us.apache.org/repos/asf/flink/blob/b6a2dc34/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/JoinTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/JoinTest.scala index 9ee7fc2..ce62252 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/JoinTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/JoinTest.scala @@ -19,7 +19,9 @@ package org.apache.flink.table.api.batch.table import org.apache.flink.api.scala._ +import org.apache.flink.table.api.batch.table.JoinTest.Merger import org.apache.flink.table.api.scala._ +import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.table.utils.TableTestBase import org.apache.flink.table.utils.TableTestUtil._ import org.junit.Test @@ -301,4 +303,49 @@ class JoinTest extends TableTestBase { util.verifyTable(joined, expected) } + + @Test + def testFilterJoinRule(): Unit = { + val util = batchTestUtil() + val t1 = util.addTable[(String, Int, Int)]('a, 'b, 'c) + val t2 = util.addTable[(String, Int, Int)]('d, 'e, 'f) + val results = t1 + .leftOuterJoin(t2, 'b === 'e) + .select('c, Merger('c, 'f) as 'c0) + .select(Merger('c, 'c0) as 'c1) + .where('c1 >= 0) + + val expected = unaryNode( + "DataSetCalc", + binaryNode( + "DataSetJoin", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "b", "c") + ), + unaryNode( + "DataSetCalc", + batchTableNode(1), + term("select", "e", "f") + ), + term("where", "=(b, e)"), + term("join", "b", "c", "e", "f"), + term("joinType", "LeftOuterJoin") + ), + term("select", "Merger$(c, Merger$(c, f)) AS c1"), + term("where", ">=(Merger$(c, Merger$(c, f)), 0)") + ) + + util.verifyTable(results, expected) + } +} + +object JoinTest { + + object Merger extends ScalarFunction { + def eval(f0: Int, f1: Int): Int = { + f0 + f1 + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/b6a2dc34/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala index 999a808..ba3c314 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala @@ -86,22 +86,18 @@ class RetractionRulesTest extends TableTestBase { val expected = unaryNode( - "DataStreamCalc", + "DataStreamGroupAggregate", unaryNode( - "DataStreamGroupAggregate", + "DataStreamCalc", unaryNode( - "DataStreamCalc", - unaryNode( - "DataStreamGroupAggregate", - "DataStreamScan(true, Acc)", - "true, AccRetract" - ), + "DataStreamGroupAggregate", + "DataStreamScan(true, Acc)", "true, AccRetract" ), - s"$defaultStatus" + "true, AccRetract" ), s"$defaultStatus" - ) + ) util.verifyTableTrait(resultTable, expected) } @@ -253,28 +249,20 @@ class RetractionRulesTest extends TableTestBase { val expected = unaryNode( - "DataStreamCalc", + "DataStreamGroupAggregate", unaryNode( - "DataStreamGroupAggregate", - unaryNode( - "DataStreamCalc", - binaryNode( - "DataStreamUnion", - unaryNode( - "DataStreamCalc", - unaryNode( - "DataStreamGroupAggregate", - "DataStreamScan(true, Acc)", - "true, AccRetract" - ), - "true, AccRetract" - ), + "DataStreamCalc", + binaryNode( + "DataStreamUnion", + unaryNode( + "DataStreamGroupAggregate", "DataStreamScan(true, Acc)", "true, AccRetract" ), + "DataStreamScan(true, Acc)", "true, AccRetract" ), - s"$defaultStatus" + "true, AccRetract" ), s"$defaultStatus" )