This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2328ff3 [FLINK-11715][table-planner-blink] Add optimize program to
organize optimization phases.
2328ff3 is described below
commit 2328ff3a45b889f5bf2c4e8873944980cd904721
Author: godfreyhe <[email protected]>
AuthorDate: Tue Feb 26 13:48:41 2019 +0800
[FLINK-11715][table-planner-blink] Add optimize program to organize
optimization phases.
This closes #7834
---
.../optimize/program/BatchOptimizeContext.scala | 26 +++
.../optimize/program/FlinkChainedProgram.scala | 166 +++++++++++++++++
.../plan/optimize/program/FlinkGroupProgram.scala | 107 +++++++++++
.../plan/optimize/program/FlinkHepProgram.scala | 98 +++++++++++
.../optimize/program/FlinkHepRuleSetProgram.scala | 196 +++++++++++++++++++++
.../optimize/program/FlinkOptimizeContext.scala | 33 ++++
.../optimize/program/FlinkOptimizeProgram.scala | 36 ++++
.../optimize/program/FlinkRuleSetProgram.scala | 77 ++++++++
.../optimize/program/FlinkVolcanoProgram.scala | 122 +++++++++++++
.../optimize/program/StreamOptimizeContext.scala | 26 +++
.../org/apache/flink/table/util/Logging.scala | 28 +++
.../optimize/program/FlinkChainedProgramTest.scala | 157 +++++++++++++++++
.../program/FlinkHepRuleSetProgramTest.scala | 103 +++++++++++
.../optimize/program/FlinkVolcanoProgramTest.scala | 51 ++++++
14 files changed, 1226 insertions(+)
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/BatchOptimizeContext.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/BatchOptimizeContext.scala
new file mode 100644
index 0000000..cfa409e
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/BatchOptimizeContext.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.optimize.program
+
+/**
+ * A FlinkOptimizeContext allows to obtain batch table environment
information when optimizing.
+ */
+trait BatchOptimizeContext extends FlinkOptimizeContext {
+
+}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkChainedProgram.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkChainedProgram.scala
new file mode 100644
index 0000000..b389e7e
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkChainedProgram.scala
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.optimize.program
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.RelNode
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+
+/**
+ * A FlinkOptimizeProgram contains a sequence of [[FlinkOptimizeProgram]]s
which are chained
+ * together.
+ *
+ * The chained-order of programs can be adjusted by [[addFirst]],
[[addLast]], [[addBefore]]
+ * and [[remove]] methods.
+ *
+ * When [[optimize]] method called, each program's optimize method will be
called in sequence.
+ *
+ * @tparam OC OptimizeContext
+ */
+class FlinkChainedProgram[OC <: FlinkOptimizeContext]
+ extends FlinkOptimizeProgram[OC]
+ with Logging {
+
+ // keep program as ordered
+ private val programNames = new util.ArrayList[String]()
+ // map program name to program instance
+ private val programMap = new util.HashMap[String, FlinkOptimizeProgram[OC]]()
+
+ /**
+ * Calling each program's optimize method in sequence.
+ */
+ def optimize(root: RelNode, context: OC): RelNode = {
+ programNames.foldLeft(root) {
+ (input, name) =>
+ val program = get(name).getOrElse(throw new TableException(s"This
should not happen."))
+
+ val start = System.currentTimeMillis()
+ val result = program.optimize(input, context)
+ val end = System.currentTimeMillis()
+
+ if (LOG.isDebugEnabled) {
+ LOG.debug(s"optimize $name cost ${end - start} ms.\n" +
+ s"optimize result: \n${RelOptUtil.toString(result)}")
+ }
+
+ result
+ }
+ }
+
+ /**
+ * Gets program associated with the given name. If not found, return
[[None]].
+ */
+ def get(name: String): Option[FlinkOptimizeProgram[OC]] =
Option.apply(programMap.get(name))
+
+ /**
+ * Gets FlinkRuleSetProgram associated with the given name. If the program
is not found or is
+ * not a [[FlinkRuleSetProgram]], return [[None]].
+ * This method is mainly used for updating rules in FlinkRuleSetProgram for
existed
+ * FlinkChainedPrograms instance.
+ */
+ def getFlinkRuleSetProgram(name: String): Option[FlinkRuleSetProgram[OC]] = {
+ get(name).getOrElse(None) match {
+ case p: FlinkRuleSetProgram[OC] => Some(p)
+ case _ => None
+ }
+ }
+
+ /**
+ * Appends the specified program to the end of program collection.
+ *
+ * @return false if program collection contains the specified program;
otherwise true.
+ */
+ def addLast(name: String, program: FlinkOptimizeProgram[OC]): Boolean = {
+ Preconditions.checkNotNull(name)
+ Preconditions.checkNotNull(program)
+
+ if (programNames.contains(name)) {
+ false
+ } else {
+ programNames.add(name)
+ programMap.put(name, program)
+ true
+ }
+ }
+
+ /**
+ * Inserts the specified program to the beginning of program collection.
+ *
+ * @return false if program collection contains the specified program;
otherwise true.
+ */
+ def addFirst(name: String, program: FlinkOptimizeProgram[OC]): Boolean = {
+ Preconditions.checkNotNull(name)
+ Preconditions.checkNotNull(program)
+
+ if (programNames.contains(name)) {
+ false
+ } else {
+ programNames.add(0, name)
+ programMap.put(name, program)
+ true
+ }
+ }
+
+ /**
+ * Inserts the specified program before `nameOfBefore`.
+ *
+ * @return false if program collection contains the specified program or
+ * does not contain `nameOfBefore`; otherwise true.
+ */
+ def addBefore(nameOfBefore: String, name: String, program:
FlinkOptimizeProgram[OC]): Boolean = {
+ Preconditions.checkNotNull(nameOfBefore)
+ Preconditions.checkNotNull(name)
+ Preconditions.checkNotNull(program)
+
+ if (programNames.contains(name) || !programNames.contains(nameOfBefore)) {
+ false
+ } else if (programNames.isEmpty) {
+ addLast(name, program)
+ } else {
+ val index = programNames.indexOf(nameOfBefore)
+ programNames.add(index, name)
+ programMap.put(name, program)
+ true
+ }
+ }
+
+ /**
+ * Removes program associated with the given name from program collection.
+ *
+ * @return The removed program associated with the given name. If not
found, return [[None]].
+ */
+ def remove(name: String): Option[FlinkOptimizeProgram[OC]] = {
+ programNames.remove(name)
+ Option.apply(programMap.remove(name))
+ }
+
+ /**
+ * Returns program names with chained order.
+ */
+ def getProgramNames: util.List[String] = new
util.ArrayList[String](programNames)
+
+}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkGroupProgram.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkGroupProgram.scala
new file mode 100644
index 0000000..9360e31
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkGroupProgram.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.optimize.program
+
+import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.RelNode
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/**
+ * A FlinkOptimizeProgram that contains a sequence of
sub-[[FlinkOptimizeProgram]]s as a group.
+ * Programs in the group will be executed in sequence,
+ * and the group will be executed `iterations` times.
+ *
+ * @tparam OC OptimizeContext
+ */
+class FlinkGroupProgram[OC <: FlinkOptimizeContext] extends
FlinkOptimizeProgram[OC] with Logging {
+
+ /**
+ * Sub-programs in this program.
+ */
+ private val programs = new util.ArrayList[(FlinkOptimizeProgram[OC],
String)]()
+
+ /**
+ * Repeat execution times for sub-programs as a group.
+ */
+ private var iterations = 1
+
+ override def optimize(root: RelNode, context: OC): RelNode = {
+ if (programs.isEmpty) {
+ return root
+ }
+
+ (0 until iterations).foldLeft(root) {
+ case (input, i) =>
+ if (LOG.isDebugEnabled) {
+ LOG.debug(s"iteration: ${i + 1}")
+ }
+ programs.foldLeft(input) {
+ case (currentInput, (program, description)) =>
+ val start = System.currentTimeMillis()
+ val result = program.optimize(currentInput, context)
+ val end = System.currentTimeMillis()
+
+ if (LOG.isDebugEnabled) {
+ LOG.debug(s"optimize $description cost ${end - start} ms.\n" +
+ s"optimize result:\n ${RelOptUtil.toString(result)}")
+ }
+ result
+ }
+ }
+ }
+
+ def addProgram(program: FlinkOptimizeProgram[OC], description: String = ""):
Unit = {
+ Preconditions.checkNotNull(program)
+ val desc = if (description != null) description else ""
+ programs.add((program, desc))
+ }
+
+ def setIterations(iterations: Int): Unit = {
+ Preconditions.checkArgument(iterations > 0)
+ this.iterations = iterations
+ }
+}
+
+class FlinkGroupProgramBuilder[OC <: FlinkOptimizeContext] {
+ private val groupProgram = new FlinkGroupProgram[OC]
+
+ def addProgram(
+ program: FlinkOptimizeProgram[OC], description: String = ""):
FlinkGroupProgramBuilder[OC] = {
+ groupProgram.addProgram(program, description)
+ this
+ }
+
+ def setIterations(iterations: Int): FlinkGroupProgramBuilder[OC] = {
+ groupProgram.setIterations(iterations)
+ this
+ }
+
+ def build(): FlinkGroupProgram[OC] = groupProgram
+
+}
+
+object FlinkGroupProgramBuilder {
+ def newBuilder[OC <: FlinkOptimizeContext] = new FlinkGroupProgramBuilder[OC]
+}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkHepProgram.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkHepProgram.scala
new file mode 100644
index 0000000..10d90f4
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkHepProgram.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.optimize.program
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.util.Preconditions
+
+import org.apache.calcite.plan.RelTrait
+import org.apache.calcite.plan.hep.{HepPlanner, HepProgram}
+import org.apache.calcite.rel.RelNode
+
+/**
+ * A FlinkOptimizeProgram that runs with [[HepPlanner]].
+ *
+ * <p>In most case, [[FlinkHepRuleSetProgram]] could meet our requirements.
+ * Otherwise we could choose this program for some advanced features,
+ * and use [[org.apache.calcite.plan.hep.HepProgramBuilder]] to create
[[HepProgram]].
+ *
+ * @tparam OC OptimizeContext
+ */
+class FlinkHepProgram[OC <: FlinkOptimizeContext] extends
FlinkOptimizeProgram[OC] {
+
+ /**
+ * [[HepProgram]] instance for [[HepPlanner]],
+ * this must not be None when doing optimize.
+ */
+ private var hepProgram: Option[HepProgram] = None
+
+ /**
+ * Requested root traits, it's an optional item.
+ */
+ private var requestedRootTraits: Option[Array[RelTrait]] = None
+
+ override def optimize(root: RelNode, context: OC): RelNode = {
+ if (hepProgram.isEmpty) {
+ throw new TableException("hepProgram should not be None in
FlinkHepProgram")
+ }
+
+ val planner = new HepPlanner(hepProgram.get, context)
+ planner.setRoot(root)
+
+ if (requestedRootTraits.isDefined) {
+ val targetTraitSet = root.getTraitSet.plusAll(requestedRootTraits.get)
+ if (!root.getTraitSet.equals(targetTraitSet)) {
+ planner.changeTraits(root, targetTraitSet.simplify)
+ }
+ }
+
+ planner.findBestExp
+ }
+
+ /**
+ * Sets hep program instance.
+ */
+ def setHepProgram(hepProgram: HepProgram): Unit = {
+ Preconditions.checkNotNull(hepProgram)
+ this.hepProgram = Some(hepProgram)
+ }
+
+ /**
+ * Sets requested root traits.
+ */
+ def setRequestedRootTraits(relTraits: Array[RelTrait]): Unit = {
+ requestedRootTraits = Option.apply(relTraits)
+ }
+
+}
+
+object FlinkHepProgram {
+
+ def apply[OC <: FlinkOptimizeContext](
+ hepProgram: HepProgram,
+ requestedRootTraits: Option[Array[RelTrait]] = None):
FlinkHepProgram[OC] = {
+
+ val flinkHepProgram = new FlinkHepProgram[OC]()
+ flinkHepProgram.setHepProgram(hepProgram)
+ if (requestedRootTraits.isDefined) {
+ flinkHepProgram.setRequestedRootTraits(requestedRootTraits.get)
+ }
+ flinkHepProgram
+ }
+}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkHepRuleSetProgram.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkHepRuleSetProgram.scala
new file mode 100644
index 0000000..a5024a7
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkHepRuleSetProgram.scala
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.optimize.program
+
+import
org.apache.flink.table.plan.optimize.program.HEP_RULES_EXECUTION_TYPE.HEP_RULES_EXECUTION_TYPE
+import org.apache.flink.util.Preconditions
+
+import org.apache.calcite.plan.RelTrait
+import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner,
HepProgramBuilder}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.tools.RuleSet
+
+import scala.collection.JavaConversions._
+
+/**
+ * A FlinkRuleSetProgram that runs with [[HepPlanner]].
+ *
+ * <p>In most case this program could meet our requirements, otherwise we
could choose
+ * [[FlinkHepProgram]] for some advanced features.
+ *
+ * <p>Currently, default hep execution type is
[[HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE]].
+ * (Please refer to [[HEP_RULES_EXECUTION_TYPE]] for more info about
execution types)
+ *
+ * @tparam OC OptimizeContext
+ */
+class FlinkHepRuleSetProgram[OC <: FlinkOptimizeContext] extends
FlinkRuleSetProgram[OC] {
+
+ /**
+ * The order of graph traversal when looking for rule matches,
+ * default match order is ARBITRARY.
+ */
+ private var matchOrder: HepMatchOrder = HepMatchOrder.ARBITRARY
+
+ /**
+ * The limit of pattern matches for this program,
+ * default match limit is Integer.MAX_VALUE.
+ */
+ private var matchLimit: Int = Integer.MAX_VALUE
+
+ /**
+ * Hep rule execution type. This is a required item,
+ * default execution type is RULE_SEQUENCE.
+ */
+ private var executionType: HEP_RULES_EXECUTION_TYPE =
HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE
+
+ /**
+ * Requested root traits, this is an optional item.
+ */
+ private var requestedRootTraits: Option[Array[RelTrait]] = None
+
+ override def optimize(input: RelNode, context: OC): RelNode = {
+ if (rules.isEmpty) {
+ return input
+ }
+
+ // build HepProgram
+ val builder = new HepProgramBuilder
+ builder.addMatchOrder(matchOrder)
+ builder.addMatchLimit(matchLimit)
+ executionType match {
+ case HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE =>
+ rules.foreach(builder.addRuleInstance)
+ case HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION =>
+ builder.addRuleCollection(rules)
+ case _ =>
+ throw new RuntimeException(s"Unsupported HEP_RULES_EXECUTION_TYPE:
$executionType")
+ }
+
+ // optimize with HepProgram
+ val flinkHepProgram = FlinkHepProgram[OC](builder.build(),
requestedRootTraits)
+ flinkHepProgram.optimize(input, context)
+ }
+
+ /**
+ * Sets rules match order.
+ */
+ def setHepMatchOrder(matchOrder: HepMatchOrder): Unit = {
+ this.matchOrder = Preconditions.checkNotNull(matchOrder)
+ }
+
+ /**
+ * Sets the limit of pattern matches.
+ */
+ def setMatchLimit(matchLimit: Int): Unit = {
+ Preconditions.checkArgument(matchLimit > 0)
+ this.matchLimit = matchLimit
+ }
+
+ /**
+ * Sets hep rule execution type.
+ */
+ def setHepRulesExecutionType(executionType: HEP_RULES_EXECUTION_TYPE): Unit
= {
+ this.executionType = Preconditions.checkNotNull(executionType)
+ }
+
+ /**
+ * Sets requested root traits.
+ */
+ def setRequestedRootTraits(relTraits: Array[RelTrait]): Unit = {
+ requestedRootTraits = Option.apply(relTraits)
+ }
+}
+
+/**
+ * An enumeration of hep rule execution type, to tell the [[HepPlanner]]
+ * how exactly execute the rules.
+ */
+object HEP_RULES_EXECUTION_TYPE extends Enumeration {
+ type HEP_RULES_EXECUTION_TYPE = Value
+
+ /**
+ * Rules in RULE_SEQUENCE type are executed with RuleInstance.
+ * RuleInstance is an instruction that matches a specific rule, each rule
in the rule
+ * collection is associated with one RuleInstance. Each RuleInstance will
be executed
+ * only once according to the order defined by the rule collection, but a
rule may be applied
+ * more than once. If arbitrary order is needed, use RULE_COLLECTION
instead.
+ *
+ * Please refer to [[HepProgramBuilder#addRuleInstance]] for more info
about RuleInstance.
+ */
+ val RULE_SEQUENCE: HEP_RULES_EXECUTION_TYPE.Value = Value
+
+ /**
+ * Rules in RULE_COLLECTION type are executed with RuleCollection.
+ * RuleCollection is an instruction that matches any rules in a given
collection.
+ * The order in which the rules within a collection will be attempted is
arbitrary,
+ * so if more control is needed, use RULE_SEQUENCE instead.
+ *
+ * Please refer to [[HepProgramBuilder#addRuleCollection]] for more info
about RuleCollection.
+ */
+ val RULE_COLLECTION: HEP_RULES_EXECUTION_TYPE.Value = Value
+}
+
+class FlinkHepRuleSetProgramBuilder[OC <: FlinkOptimizeContext] {
+ private val hepRuleSetProgram = new FlinkHepRuleSetProgram[OC]
+
+ def setHepRulesExecutionType(
+ executionType: HEP_RULES_EXECUTION_TYPE):
FlinkHepRuleSetProgramBuilder[OC] = {
+ hepRuleSetProgram.setHepRulesExecutionType(executionType)
+ this
+ }
+
+ /**
+ * Sets rules match order.
+ */
+ def setHepMatchOrder(matchOrder: HepMatchOrder):
FlinkHepRuleSetProgramBuilder[OC] = {
+ hepRuleSetProgram.setHepMatchOrder(matchOrder)
+ this
+ }
+
+ /**
+ * Sets the limit of pattern matches.
+ */
+ def setMatchLimit(matchLimit: Int): FlinkHepRuleSetProgramBuilder[OC] = {
+ hepRuleSetProgram.setMatchLimit(matchLimit)
+ this
+ }
+
+ /**
+ * Adds rules for this program.
+ */
+ def add(ruleSet: RuleSet): FlinkHepRuleSetProgramBuilder[OC] = {
+ hepRuleSetProgram.add(ruleSet)
+ this
+ }
+
+ /**
+ * Sets requested root traits.
+ */
+ def setRequestedRootTraits(relTraits: Array[RelTrait]):
FlinkHepRuleSetProgramBuilder[OC] = {
+ hepRuleSetProgram.setRequestedRootTraits(relTraits)
+ this
+ }
+
+ def build(): FlinkHepRuleSetProgram[OC] = hepRuleSetProgram
+
+}
+
+object FlinkHepRuleSetProgramBuilder {
+ def newBuilder[OC <: FlinkOptimizeContext] = new
FlinkHepRuleSetProgramBuilder[OC]
+}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkOptimizeContext.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkOptimizeContext.scala
new file mode 100644
index 0000000..4739ae4
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkOptimizeContext.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.optimize.program
+
+import org.apache.calcite.plan.Context
+import org.apache.calcite.plan.volcano.VolcanoPlanner
+
+/**
+ * A FlinkOptimizeContext allows to obtain table environment information when
optimizing.
+ */
+trait FlinkOptimizeContext extends Context {
+
+ /**
+ * Gets [[VolcanoPlanner]] instance defined in
[[org.apache.flink.table.api.TableEnvironment]].
+ */
+ def getVolcanoPlanner: VolcanoPlanner
+}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkOptimizeProgram.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkOptimizeProgram.scala
new file mode 100644
index 0000000..108bb3d
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkOptimizeProgram.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.optimize.program
+
+import org.apache.calcite.rel.RelNode
+
+/**
+ * Likes [[org.apache.calcite.tools.Program]], FlinkOptimizeProgram
transforms a relational
+ * expression into another relational expression.
+ *
+ * @tparam OC OptimizeContext
+ */
+trait FlinkOptimizeProgram[OC <: FlinkOptimizeContext] {
+
+ /**
+ * Transforms a relational expression into another relational expression.
+ */
+ def optimize(root: RelNode, context: OC): RelNode
+
+}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkRuleSetProgram.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkRuleSetProgram.scala
new file mode 100644
index 0000000..87a52eb
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkRuleSetProgram.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.optimize.program
+
+import org.apache.flink.util.Preconditions
+
+import org.apache.calcite.plan.RelOptRule
+import org.apache.calcite.tools.RuleSet
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/**
+ * A FlinkOptimizeProgram that transforms a relational expression into
+ * another relational expression with [[RuleSet]].
+ */
+abstract class FlinkRuleSetProgram[OC <: FlinkOptimizeContext] extends
FlinkOptimizeProgram[OC] {
+
+ /**
+ * All [[RelOptRule]]s for optimizing associated with this program.
+ */
+ protected val rules: util.List[RelOptRule] = new util.ArrayList[RelOptRule]()
+
+ /**
+ * Adds specified rules to this program.
+ */
+ def add(ruleSet: RuleSet): Unit = {
+ Preconditions.checkNotNull(ruleSet)
+ ruleSet.foreach { rule =>
+ if (!contains(rule)) {
+ rules.add(rule)
+ }
+ }
+ }
+
+ /**
+ * Removes specified rules from this program.
+ */
+ def remove(ruleSet: RuleSet): Unit = {
+ Preconditions.checkNotNull(ruleSet)
+ ruleSet.foreach(rules.remove)
+ }
+
+ /**
+ * Removes all rules from this program first, and then adds specified rules
to this program.
+ */
+ def replaceAll(ruleSet: RuleSet): Unit = {
+ Preconditions.checkNotNull(ruleSet)
+ rules.clear()
+ ruleSet.foreach(rules.add)
+ }
+
+ /**
+ * Checks whether this program contains the specified rule.
+ */
+ def contains(rule: RelOptRule): Boolean = {
+ Preconditions.checkNotNull(rule)
+ rules.contains(rule)
+ }
+}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkVolcanoProgram.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkVolcanoProgram.scala
new file mode 100644
index 0000000..7c85c5e
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkVolcanoProgram.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.optimize.program
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.util.Preconditions
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
+import org.apache.calcite.plan.{RelOptUtil, RelTrait}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.tools.{Programs, RuleSet}
+
+/**
+ * A FlinkRuleSetProgram that runs with
[[org.apache.calcite.plan.volcano.VolcanoPlanner]].
+ *
+ * @tparam OC OptimizeContext
+ */
+class FlinkVolcanoProgram[OC <: FlinkOptimizeContext] extends
FlinkRuleSetProgram[OC] {
+
+ /**
+ * Required output traits, this must not be None when doing optimize.
+ */
+ protected var requiredOutputTraits: Option[Array[RelTrait]] = None
+
+ override def optimize(root: RelNode, context: OC): RelNode = {
+ if (rules.isEmpty) {
+ return root
+ }
+
+ if (requiredOutputTraits.isEmpty) {
+ throw new TableException("Required output traits should not be None in
FlinkVolcanoProgram")
+ }
+
+ val targetTraits =
root.getTraitSet.plusAll(requiredOutputTraits.get).simplify()
+ // reuse VolcanoPlanner instance defined in context
+ val planner = Preconditions.checkNotNull(context.getVolcanoPlanner)
+ val optProgram = Programs.ofRules(rules)
+
+ try {
+ optProgram.run(
+ planner,
+ root,
+ targetTraits,
+ ImmutableList.of(),
+ ImmutableList.of())
+ } catch {
+ case e: CannotPlanException =>
+ throw new TableException(
+ s"Cannot generate a valid execution plan for the given query: \n\n" +
+ s"${RelOptUtil.toString(root)}\n" +
+ s"This exception indicates that the query uses an unsupported SQL
feature.\n" +
+ s"Please check the documentation for the set of currently
supported SQL features.")
+ case t: TableException =>
+ throw new TableException(
+ s"Cannot generate a valid execution plan for the given query: \n\n" +
+ s"${RelOptUtil.toString(root)}\n" +
+ s"${t.getMessage}\n" +
+ s"Please check the documentation for the set of currently
supported SQL features.")
+ case a: AssertionError =>
+ throw new AssertionError(s"Sql optimization: Assertion error:
${a.getMessage}", a)
+ case r: RuntimeException if r.getCause.isInstanceOf[TableException] =>
+ throw new TableException(
+ s"Sql optimization: Cannot generate a valid execution plan for the
given query: \n\n" +
+ s"${RelOptUtil.toString(root)}\n" +
+ s"${r.getMessage}\n" +
+ s"Please check the documentation for the set of currently
supported SQL features.")
+ }
+ }
+
+ /**
+ * Sets required output traits.
+ */
+ def setRequiredOutputTraits(relTraits: Array[RelTrait]): Unit = {
+ Preconditions.checkNotNull(relTraits)
+ requiredOutputTraits = Some(relTraits)
+ }
+
+}
+
+class FlinkVolcanoProgramBuilder[OC <: FlinkOptimizeContext] {
+ private val volcanoProgram = new FlinkVolcanoProgram[OC]
+
+ /**
+ * Adds rules for this program.
+ */
+ def add(ruleSet: RuleSet): FlinkVolcanoProgramBuilder[OC] = {
+ volcanoProgram.add(ruleSet)
+ this
+ }
+
+ /**
+ * Sets required output traits.
+ */
+ def setRequiredOutputTraits(relTraits: Array[RelTrait]):
FlinkVolcanoProgramBuilder[OC] = {
+ volcanoProgram.setRequiredOutputTraits(relTraits)
+ this
+ }
+
+ def build(): FlinkVolcanoProgram[OC] = volcanoProgram
+
+}
+
+object FlinkVolcanoProgramBuilder {
+ def newBuilder[OC <: FlinkOptimizeContext] = new
FlinkVolcanoProgramBuilder[OC]
+}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/StreamOptimizeContext.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/StreamOptimizeContext.scala
new file mode 100644
index 0000000..34fdafb
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/StreamOptimizeContext.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.optimize.program
+
+/**
+ * A OptimizeContext allows to obtain stream table environment information
when optimizing.
+ */
+trait StreamOptimizeContext extends FlinkOptimizeContext {
+
+}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/util/Logging.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/util/Logging.scala
new file mode 100644
index 0000000..b6be99e6
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/util/Logging.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.util
+
+import org.slf4j.{Logger, LoggerFactory}
+
+/**
+ * Helper class to ensure the logger is never serialized.
+ */
+trait Logging {
+ @transient lazy val LOG: Logger = LoggerFactory.getLogger(getClass)
+}
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/optimize/program/FlinkChainedProgramTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/optimize/program/FlinkChainedProgramTest.scala
new file mode 100644
index 0000000..5ccebc8
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/optimize/program/FlinkChainedProgramTest.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.optimize.program
+
+import org.apache.calcite.plan.Convention
+import org.apache.calcite.plan.hep.{HepMatchOrder, HepProgramBuilder}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.rules._
+import org.apache.calcite.tools.RuleSets
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConversions._
+
+/**
+ * Tests for [[FlinkChainedProgram]].
+ */
+class FlinkChainedProgramTest {
+
+ @Test
+ def testAddGetRemovePrograms(): Unit = {
+ val programs = new FlinkChainedProgram
+ assertTrue(programs.getProgramNames.isEmpty)
+ assertTrue(programs.get("o1").isEmpty)
+
+ // test addFirst
+ val builder = new HepProgramBuilder()
+ builder
+ .addMatchLimit(10)
+ .addMatchOrder(HepMatchOrder.ARBITRARY)
+ .addRuleInstance(SubQueryRemoveRule.FILTER)
+ .addRuleInstance(SubQueryRemoveRule.PROJECT)
+ .addRuleInstance(SubQueryRemoveRule.JOIN)
+ .addMatchLimit(100)
+ .addMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .addRuleCollection(Array(
+ TableScanRule.INSTANCE,
+ ValuesReduceRule.FILTER_INSTANCE
+ ).toList)
+ val program1 = FlinkHepProgram(builder.build())
+ assertTrue(programs.addFirst("o2", program1))
+ assertEquals(List("o2"), programs.getProgramNames.toList)
+ assertTrue(programs.get("o2").isDefined)
+ assertTrue(program1 eq programs.get("o2").get)
+
+ val program2 = FlinkHepRuleSetProgramBuilder.newBuilder
+ .add(RuleSets.ofList(
+ ReduceExpressionsRule.FILTER_INSTANCE,
+ ReduceExpressionsRule.PROJECT_INSTANCE,
+ ReduceExpressionsRule.CALC_INSTANCE,
+ ReduceExpressionsRule.JOIN_INSTANCE
+ )).build()
+ assertTrue(programs.addFirst("o1", program2))
+ assertEquals(List("o1", "o2"), programs.getProgramNames.toList)
+ assertTrue(programs.get("o1").isDefined)
+ assertTrue(program2 eq programs.get("o1").get)
+
+ // test addLast
+ val program3 = FlinkHepRuleSetProgramBuilder.newBuilder
+ .add(RuleSets.ofList(
+ FilterCalcMergeRule.INSTANCE,
+ ProjectCalcMergeRule.INSTANCE,
+ FilterToCalcRule.INSTANCE,
+ ProjectToCalcRule.INSTANCE,
+ CalcMergeRule.INSTANCE))
+ .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
+ .setMatchLimit(10000)
+ .setHepMatchOrder(HepMatchOrder.ARBITRARY)
+ .build()
+ assertTrue(programs.addLast("o4", program3))
+ assertEquals(List("o1", "o2", "o4"), programs.getProgramNames.toList)
+ assertTrue(programs.get("o4").isDefined)
+ assertTrue(program3 eq programs.get("o4").get)
+
+ // test addBefore
+ val TEST = new Convention.Impl("TEST", classOf[RelNode])
+ val program4 = FlinkVolcanoProgramBuilder.newBuilder
+ .add(RuleSets.ofList(
+ FilterJoinRule.FILTER_ON_JOIN,
+ FilterJoinRule.JOIN))
+ .setRequiredOutputTraits(Array(TEST))
+ .build()
+ assertTrue(programs.addBefore("o4", "o3", program4))
+ assertEquals(List("o1", "o2", "o3", "o4"), programs.getProgramNames.toList)
+ assertTrue(programs.get("o3").isDefined)
+ assertTrue(program4 eq programs.get("o3").get)
+
+ // test remove
+ val p2 = programs.remove("o2")
+ assertTrue(p2.isDefined)
+ assertTrue(p2.get eq program1)
+ assertEquals(List("o1", "o3", "o4"), programs.getProgramNames.toList)
+ assertTrue(programs.remove("o0").isEmpty)
+ assertEquals(List("o1", "o3", "o4"), programs.getProgramNames.toList)
+
+ // program already exists
+ assertFalse(programs.addFirst("o3", program1))
+ assertFalse(programs.addLast("o4", program1))
+ assertFalse(programs.addBefore("o0", "o4", program1))
+ assertEquals(List("o1", "o3", "o4"), programs.getProgramNames.toList)
+ }
+
+ @Test
+ def testGetFlinkRuleSetProgram(): Unit = {
+ val programs = new FlinkChainedProgram
+ assertTrue(programs.getProgramNames.isEmpty)
+
+ val program1 = FlinkHepRuleSetProgramBuilder.newBuilder
+ .add(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .build()
+ programs.addFirst("o1", program1)
+ assertTrue(programs.get("o1").isDefined)
+ assertTrue(program1 eq programs.get("o1").get)
+
+ val builder = new HepProgramBuilder()
+ builder
+ .addMatchLimit(10)
+ .addRuleInstance(SubQueryRemoveRule.FILTER)
+ .addRuleInstance(SubQueryRemoveRule.JOIN)
+ .addMatchOrder(HepMatchOrder.BOTTOM_UP)
+ val program2 = FlinkHepProgram(builder.build())
+ programs.addLast("o2", program2)
+ assertTrue(programs.get("o2").isDefined)
+ assertTrue(program2 eq programs.get("o2").get)
+ assertTrue(programs.getFlinkRuleSetProgram("o2").isEmpty)
+
+ assertTrue(programs.getFlinkRuleSetProgram("o3").isEmpty)
+
+ val p1 = programs.getFlinkRuleSetProgram("o1")
+ assertTrue(p1.isDefined)
+ p1.get.add(RuleSets.ofList(SubQueryRemoveRule.PROJECT))
+ assertTrue(p1.get eq programs.getFlinkRuleSetProgram("o1").get)
+ }
+
+ @Test(expected = classOf[NullPointerException])
+ def testAddNullProgram(): Unit = {
+ val programs = new FlinkChainedProgram[BatchOptimizeContext]
+ programs.addLast("o1", null)
+ }
+}
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/optimize/program/FlinkHepRuleSetProgramTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/optimize/program/FlinkHepRuleSetProgramTest.scala
new file mode 100644
index 0000000..465edfb
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/optimize/program/FlinkHepRuleSetProgramTest.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.optimize.program
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.rel.rules._
+import org.apache.calcite.tools.RuleSets
+import org.junit.Assert.{assertFalse, assertTrue}
+import org.junit.Test
+
+/**
+ * Tests for [[FlinkHepRuleSetProgram]].
+ */
+class FlinkHepRuleSetProgramTest {
+
+ @Test
+ def testBuildFlinkHepRuleSetProgram(): Unit = {
+ FlinkHepRuleSetProgramBuilder.newBuilder
+ .add(RuleSets.ofList(
+ ReduceExpressionsRule.FILTER_INSTANCE,
+ ReduceExpressionsRule.PROJECT_INSTANCE,
+ ReduceExpressionsRule.CALC_INSTANCE,
+ ReduceExpressionsRule.JOIN_INSTANCE
+ ))
+ .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+ .setMatchLimit(10)
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .build()
+ }
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testMatchLimitLessThan1(): Unit = {
+ FlinkHepRuleSetProgramBuilder.newBuilder.setMatchLimit(0)
+ }
+
+ @Test(expected = classOf[NullPointerException])
+ def testNullHepMatchOrder(): Unit = {
+ FlinkHepRuleSetProgramBuilder.newBuilder.setHepMatchOrder(null)
+ }
+
+ @Test(expected = classOf[NullPointerException])
+ def testNullHepRulesExecutionType(): Unit = {
+ FlinkHepRuleSetProgramBuilder.newBuilder.setHepRulesExecutionType(null)
+ }
+
+ @Test
+ def testRuleOperations(): Unit = {
+ val program = FlinkHepRuleSetProgramBuilder.newBuilder
+ .add(RuleSets.ofList(
+ ReduceExpressionsRule.FILTER_INSTANCE,
+ ReduceExpressionsRule.PROJECT_INSTANCE,
+ ReduceExpressionsRule.CALC_INSTANCE,
+ ReduceExpressionsRule.JOIN_INSTANCE
+ )).build()
+
+ assertTrue(program.contains(ReduceExpressionsRule.FILTER_INSTANCE))
+ assertTrue(program.contains(ReduceExpressionsRule.PROJECT_INSTANCE))
+ assertTrue(program.contains(ReduceExpressionsRule.CALC_INSTANCE))
+ assertTrue(program.contains(ReduceExpressionsRule.JOIN_INSTANCE))
+ assertFalse(program.contains(SubQueryRemoveRule.FILTER))
+
+ program.remove(RuleSets.ofList(
+ ReduceExpressionsRule.FILTER_INSTANCE,
+ ReduceExpressionsRule.PROJECT_INSTANCE))
+ assertFalse(program.contains(ReduceExpressionsRule.FILTER_INSTANCE))
+ assertFalse(program.contains(ReduceExpressionsRule.PROJECT_INSTANCE))
+ assertTrue(program.contains(ReduceExpressionsRule.CALC_INSTANCE))
+ assertTrue(program.contains(ReduceExpressionsRule.JOIN_INSTANCE))
+
+ program.replaceAll(RuleSets.ofList(SubQueryRemoveRule.FILTER))
+ assertFalse(program.contains(ReduceExpressionsRule.CALC_INSTANCE))
+ assertFalse(program.contains(ReduceExpressionsRule.JOIN_INSTANCE))
+ assertTrue(program.contains(SubQueryRemoveRule.FILTER))
+
+ program.add(RuleSets.ofList(
+ SubQueryRemoveRule.PROJECT,
+ SubQueryRemoveRule.JOIN))
+ assertTrue(program.contains(SubQueryRemoveRule.FILTER))
+ assertTrue(program.contains(SubQueryRemoveRule.PROJECT))
+ assertTrue(program.contains(SubQueryRemoveRule.JOIN))
+ }
+
+ @Test(expected = classOf[NullPointerException])
+ def testNullRuleSets(): Unit = {
+ FlinkHepRuleSetProgramBuilder.newBuilder.add(null)
+ }
+}
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/optimize/program/FlinkVolcanoProgramTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/optimize/program/FlinkVolcanoProgramTest.scala
new file mode 100644
index 0000000..31bf32b
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/optimize/program/FlinkVolcanoProgramTest.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.optimize.program
+
+import org.apache.calcite.plan.Convention
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.rules._
+import org.apache.calcite.tools.RuleSets
+import org.junit.Test
+
+/**
+ * Tests for [[FlinkVolcanoProgramTest]].
+ */
+class FlinkVolcanoProgramTest {
+
+ @Test
+ def testBuildFlinkVolcanoProgram(): Unit = {
+ val TEST = new Convention.Impl("TEST", classOf[RelNode])
+ FlinkVolcanoProgramBuilder.newBuilder
+ .add(RuleSets.ofList(
+ ReduceExpressionsRule.FILTER_INSTANCE,
+ ReduceExpressionsRule.PROJECT_INSTANCE,
+ ReduceExpressionsRule.CALC_INSTANCE,
+ ReduceExpressionsRule.JOIN_INSTANCE
+ ))
+ .setRequiredOutputTraits(Array(TEST))
+ .build()
+ }
+
+ @Test(expected = classOf[NullPointerException])
+ def testNullRequiredOutputTraits(): Unit = {
+ FlinkVolcanoProgramBuilder.newBuilder.setRequiredOutputTraits(null)
+ }
+
+}