[FLINK-6089] [table] Add decoration phase for stream queries to rewrite plans after the cost-based optimization.
This closes #3564. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6949c8c7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6949c8c7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6949c8c7 Branch: refs/heads/table-retraction Commit: 6949c8c79c41344023df08dde2936f06daa00e0d Parents: f97deaa Author: hequn.chq <[email protected]> Authored: Thu Mar 16 11:11:17 2017 +0800 Committer: Fabian Hueske <[email protected]> Committed: Fri Mar 24 20:19:17 2017 +0100 ---------------------------------------------------------------------- .../table/api/StreamTableEnvironment.scala | 38 ++++++++- .../flink/table/calcite/CalciteConfig.scala | 89 +++++++++++++++++--- .../flink/table/plan/rules/FlinkRuleSets.scala | 9 +- .../flink/table/CalciteConfigBuilderTest.scala | 69 +++++++++++++++ 4 files changed, 188 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6949c8c7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index d927c3a..225a675 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -25,7 +25,7 @@ import org.apache.calcite.plan.hep.HepMatchOrder import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.sql2rel.RelDecorrelator -import org.apache.calcite.tools.RuleSet +import org.apache.calcite.tools.{RuleSet, RuleSets} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.GenericTypeInfo import org.apache.flink.streaming.api.datastream.DataStream @@ -39,6 +39,8 @@ import org.apache.flink.table.sinks.{StreamTableSink, TableSink} import org.apache.flink.table.sources.{StreamTableSource, TableSource} import org.apache.flink.types.Row +import _root_.scala.collection.JavaConverters._ + /** * The base class for stream TableEnvironments. * @@ -211,6 +213,26 @@ abstract class StreamTableEnvironment( } /** + * Returns the decoration rule set for this environment + * including a custom RuleSet configuration. + */ + protected def getDecoRuleSet: RuleSet = { + val calciteConfig = config.getCalciteConfig + calciteConfig.getDecoRuleSet match { + + case None => + getBuiltInDecoRuleSet + + case Some(ruleSet) => + if (calciteConfig.replacesDecoRuleSet) { + ruleSet + } else { + RuleSets.ofList((getBuiltInDecoRuleSet.asScala ++ ruleSet.asScala).asJava) + } + } + } + + /** * Returns the built-in normalization rules that are defined by the environment. */ protected def getBuiltInNormRuleSet: RuleSet = FlinkRuleSets.DATASTREAM_NORM_RULES @@ -221,6 +243,11 @@ abstract class StreamTableEnvironment( protected def getBuiltInOptRuleSet: RuleSet = FlinkRuleSets.DATASTREAM_OPT_RULES /** + * Returns the built-in decoration rules that are defined by the environment. + */ + protected def getBuiltInDecoRuleSet: RuleSet = FlinkRuleSets.DATASTREAM_DECO_RULES + + /** * Generates the optimized [[RelNode]] tree from the original relational node tree. * * @param relNode The root node of the relational expression tree. @@ -248,7 +275,14 @@ abstract class StreamTableEnvironment( normalizedPlan } - optimizedPlan + // 4. decorate the optimized plan + val decoRuleSet = getDecoRuleSet + val decoratedPlan = if (decoRuleSet.iterator().hasNext) { + runHepPlanner(HepMatchOrder.BOTTOM_UP, decoRuleSet, optimizedPlan, optimizedPlan.getTraitSet) + } else { + optimizedPlan + } + decoratedPlan } http://git-wip-us.apache.org/repos/asf/flink/blob/6949c8c7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala index 65a61b2..ba8df81 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala @@ -31,15 +31,36 @@ import scala.collection.JavaConverters._ * Builder for creating a Calcite configuration. */ class CalciteConfigBuilder { + + /** + * Defines the normalization rule set. Normalization rules are dedicated for rewriting + * predicated logical plan before volcano optimization. + */ private var replaceNormRules: Boolean = false private var normRuleSets: List[RuleSet] = Nil + /** + * Defines the optimization rule set. Optimization rules are used during volcano optimization. + */ private var replaceOptRules: Boolean = false private var optRuleSets: List[RuleSet] = Nil + /** + * Defines the decoration rule set. Decoration rules are dedicated for rewriting predicated + * logical plan after volcano optimization. + */ + private var replaceDecoRules: Boolean = false + private var decoRuleSets: List[RuleSet] = Nil + + /** + * Defines the SQL operator tables. + */ private var replaceOperatorTable: Boolean = false private var operatorTables: List[SqlOperatorTable] = Nil + /** + * Defines a SQL parser configuration. + */ private var replaceSqlParserConfig: Option[SqlParser.Config] = None /** @@ -81,6 +102,32 @@ class CalciteConfigBuilder { } /** + * Replaces the built-in decoration rule set with the given rule set. + * + * The decoration rules are applied after the cost-based optimization phase. + * The decoration phase allows to rewrite the optimized plan and is not cost-based. + * + */ + def replaceDecoRuleSet(replaceRuleSet: RuleSet): CalciteConfigBuilder = { + Preconditions.checkNotNull(replaceRuleSet) + decoRuleSets = List(replaceRuleSet) + replaceDecoRules = true + this + } + + /** + * Appends the given decoration rule set to the built-in rule set. + * + * The decoration rules are applied after the cost-based optimization phase. + * The decoration phase allows to rewrite the optimized plan and is not cost-based. + */ + def addDecoRuleSet(addedRuleSet: RuleSet): CalciteConfigBuilder = { + Preconditions.checkNotNull(addedRuleSet) + decoRuleSets = addedRuleSet :: decoRuleSets + this + } + + /** * Replaces the built-in SQL operator table with the given table. */ def replaceSqlOperatorTable(replaceSqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder = { @@ -113,35 +160,39 @@ class CalciteConfigBuilder { val replacesNormRuleSet: Boolean, val getOptRuleSet: Option[RuleSet], val replacesOptRuleSet: Boolean, + val getDecoRuleSet: Option[RuleSet], + val replacesDecoRuleSet: Boolean, val getSqlOperatorTable: Option[SqlOperatorTable], val replacesSqlOperatorTable: Boolean, val getSqlParserConfig: Option[SqlParser.Config]) extends CalciteConfig + /** - * Builds a new [[CalciteConfig]]. + * Convert the [[RuleSet]] List to [[Option]] type */ - def build(): CalciteConfig = new CalciteConfigImpl( - normRuleSets match { + private def getRuleSet(inputRuleSet: List[RuleSet]): Option[RuleSet] = { + inputRuleSet match { case Nil => None case h :: Nil => Some(h) case _ => // concat rule sets val concatRules = - normRuleSets.foldLeft(Nil: Iterable[RelOptRule])((c, r) => r.asScala ++ c) + inputRuleSet.foldLeft(Nil: Iterable[RelOptRule])((c, r) => r.asScala ++ c) Some(RuleSets.ofList(concatRules.asJava)) - }, + } + } + + /** + * Builds a new [[CalciteConfig]]. + */ + def build(): CalciteConfig = new CalciteConfigImpl( + getRuleSet(normRuleSets), replaceNormRules, - optRuleSets match { - case Nil => None - case h :: Nil => Some(h) - case _ => - // concat rule sets - val concatRules = - optRuleSets.foldLeft(Nil: Iterable[RelOptRule])((c, r) => r.asScala ++ c) - Some(RuleSets.ofList(concatRules.asJava)) - }, + getRuleSet(optRuleSets), replaceOptRules, + getRuleSet(decoRuleSets), + replaceDecoRules, operatorTables match { case Nil => None case h :: Nil => Some(h) @@ -179,6 +230,16 @@ trait CalciteConfig { def getOptRuleSet: Option[RuleSet] /** + * Returns whether this configuration replaces the built-in decoration rule set. + */ + def replacesDecoRuleSet: Boolean + + /** + * Returns a custom decoration rule set. + */ + def getDecoRuleSet: Option[RuleSet] + + /** * Returns whether this configuration replaces the built-in SQL operator table. */ def replacesSqlOperatorTable: Boolean http://git-wip-us.apache.org/repos/asf/flink/blob/6949c8c7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala index 952ee34..1301c8d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala @@ -23,7 +23,6 @@ import org.apache.calcite.tools.{RuleSet, RuleSets} import org.apache.flink.table.calcite.rules.{FlinkAggregateExpandDistinctAggregatesRule, FlinkAggregateJoinTransposeRule} import org.apache.flink.table.plan.rules.dataSet._ import org.apache.flink.table.plan.rules.datastream._ -import org.apache.flink.table.plan.rules.datastream.{DataStreamCalcRule, DataStreamScanRule, DataStreamUnionRule} object FlinkRuleSets { @@ -186,4 +185,12 @@ object FlinkRuleSets { PushFilterIntoStreamTableSourceScanRule.INSTANCE ) + /** + * RuleSet to decorate plans for stream / DataStream execution + */ + val DATASTREAM_DECO_RULES: RuleSet = RuleSets.ofList( + // rules + + ) + } http://git-wip-us.apache.org/repos/asf/flink/blob/6949c8c7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala index 6c07e28..d0de8fa 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala @@ -39,6 +39,9 @@ class CalciteConfigBuilderTest { assertFalse(cc.replacesOptRuleSet) assertFalse(cc.getOptRuleSet.isDefined) + + assertFalse(cc.replacesDecoRuleSet) + assertFalse(cc.getDecoRuleSet.isDefined) } @Test @@ -47,6 +50,7 @@ class CalciteConfigBuilderTest { val cc: CalciteConfig = new CalciteConfigBuilder() .addNormRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE)) .replaceOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE)) + .replaceDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE)) .build() assertFalse(cc.replacesNormRuleSet) @@ -54,6 +58,9 @@ class CalciteConfigBuilderTest { assertTrue(cc.replacesOptRuleSet) assertTrue(cc.getOptRuleSet.isDefined) + + assertTrue(cc.replacesDecoRuleSet) + assertTrue(cc.getDecoRuleSet.isDefined) } @Test @@ -181,6 +188,68 @@ class CalciteConfigBuilderTest { } @Test + def testReplaceDecorationRules(): Unit = { + + val cc: CalciteConfig = new CalciteConfigBuilder() + .replaceDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE)) + .build() + + assertEquals(true, cc.replacesDecoRuleSet) + assertTrue(cc.getDecoRuleSet.isDefined) + val cSet = cc.getDecoRuleSet.get.iterator().asScala.toSet + assertEquals(1, cSet.size) + assertTrue(cSet.contains(ReduceExpressionsRule.FILTER_INSTANCE)) + } + + @Test + def testReplaceDecorationAddRules(): Unit = { + + val cc: CalciteConfig = new CalciteConfigBuilder() + .replaceDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE)) + .addDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.PROJECT_INSTANCE)) + .build() + + assertEquals(true, cc.replacesDecoRuleSet) + assertTrue(cc.getDecoRuleSet.isDefined) + val cSet = cc.getDecoRuleSet.get.iterator().asScala.toSet + assertEquals(2, cSet.size) + assertTrue(cSet.contains(ReduceExpressionsRule.FILTER_INSTANCE)) + assertTrue(cSet.contains(ReduceExpressionsRule.PROJECT_INSTANCE)) + } + + @Test + def testAddDecorationRules(): Unit = { + + val cc: CalciteConfig = new CalciteConfigBuilder() + .addDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE)) + .build() + + assertEquals(false, cc.replacesDecoRuleSet) + assertTrue(cc.getDecoRuleSet.isDefined) + val cSet = cc.getDecoRuleSet.get.iterator().asScala.toSet + assertEquals(1, cSet.size) + assertTrue(cSet.contains(ReduceExpressionsRule.FILTER_INSTANCE)) + } + + @Test + def testAddAddDecorationRules(): Unit = { + + val cc: CalciteConfig = new CalciteConfigBuilder() + .addDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE)) + .addDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.PROJECT_INSTANCE, + ReduceExpressionsRule.CALC_INSTANCE)) + .build() + + assertEquals(false, cc.replacesDecoRuleSet) + assertTrue(cc.getDecoRuleSet.isDefined) + val cList = cc.getDecoRuleSet.get.iterator().asScala.toList + assertEquals(3, cList.size) + assertEquals(cList.head, ReduceExpressionsRule.FILTER_INSTANCE) + assertEquals(cList(1), ReduceExpressionsRule.PROJECT_INSTANCE) + assertEquals(cList(2), ReduceExpressionsRule.CALC_INSTANCE) + } + + @Test def testDefaultOperatorTable(): Unit = { val cc: CalciteConfig = new CalciteConfigBuilder()
