This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2328ff3  [FLINK-11715][table-planner-blink] Add optimize program to 
organize optimization phases.
2328ff3 is described below

commit 2328ff3a45b889f5bf2c4e8873944980cd904721
Author: godfreyhe <[email protected]>
AuthorDate: Tue Feb 26 13:48:41 2019 +0800

    [FLINK-11715][table-planner-blink] Add optimize program to organize 
optimization phases.
    
    This closes #7834
---
 .../optimize/program/BatchOptimizeContext.scala    |  26 +++
 .../optimize/program/FlinkChainedProgram.scala     | 166 +++++++++++++++++
 .../plan/optimize/program/FlinkGroupProgram.scala  | 107 +++++++++++
 .../plan/optimize/program/FlinkHepProgram.scala    |  98 +++++++++++
 .../optimize/program/FlinkHepRuleSetProgram.scala  | 196 +++++++++++++++++++++
 .../optimize/program/FlinkOptimizeContext.scala    |  33 ++++
 .../optimize/program/FlinkOptimizeProgram.scala    |  36 ++++
 .../optimize/program/FlinkRuleSetProgram.scala     |  77 ++++++++
 .../optimize/program/FlinkVolcanoProgram.scala     | 122 +++++++++++++
 .../optimize/program/StreamOptimizeContext.scala   |  26 +++
 .../org/apache/flink/table/util/Logging.scala      |  28 +++
 .../optimize/program/FlinkChainedProgramTest.scala | 157 +++++++++++++++++
 .../program/FlinkHepRuleSetProgramTest.scala       | 103 +++++++++++
 .../optimize/program/FlinkVolcanoProgramTest.scala |  51 ++++++
 14 files changed, 1226 insertions(+)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/BatchOptimizeContext.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/BatchOptimizeContext.scala
new file mode 100644
index 0000000..cfa409e
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/BatchOptimizeContext.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.optimize.program
+
+/**
+  * A FlinkOptimizeContext allows to obtain batch table environment 
information when optimizing.
+  */
+trait BatchOptimizeContext extends FlinkOptimizeContext {
+
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkChainedProgram.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkChainedProgram.scala
new file mode 100644
index 0000000..b389e7e
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkChainedProgram.scala
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.optimize.program
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.RelNode
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+
+/**
+  * A FlinkOptimizeProgram contains a sequence of [[FlinkOptimizeProgram]]s 
which are chained
+  * together.
+  *
+  * The chained-order of programs can be adjusted by [[addFirst]], 
[[addLast]], [[addBefore]]
+  * and [[remove]] methods.
+  *
+  * When [[optimize]] method called, each program's optimize method will be 
called in sequence.
+  *
+  * @tparam OC OptimizeContext
+  */
+class FlinkChainedProgram[OC <: FlinkOptimizeContext]
+  extends FlinkOptimizeProgram[OC]
+  with Logging {
+
+  // keep program as ordered
+  private val programNames = new util.ArrayList[String]()
+  // map program name to program instance
+  private val programMap = new util.HashMap[String, FlinkOptimizeProgram[OC]]()
+
+  /**
+    * Calling each program's optimize method in sequence.
+    */
+  def optimize(root: RelNode, context: OC): RelNode = {
+    programNames.foldLeft(root) {
+      (input, name) =>
+        val program = get(name).getOrElse(throw new TableException(s"This 
should not happen."))
+
+        val start = System.currentTimeMillis()
+        val result = program.optimize(input, context)
+        val end = System.currentTimeMillis()
+
+        if (LOG.isDebugEnabled) {
+          LOG.debug(s"optimize $name cost ${end - start} ms.\n" +
+            s"optimize result: \n${RelOptUtil.toString(result)}")
+        }
+
+        result
+    }
+  }
+
+  /**
+    * Gets program associated with the given name. If not found, return 
[[None]].
+    */
+  def get(name: String): Option[FlinkOptimizeProgram[OC]] = 
Option.apply(programMap.get(name))
+
+  /**
+    * Gets FlinkRuleSetProgram associated with the given name. If the program 
is not found or is
+    * not a [[FlinkRuleSetProgram]], return [[None]].
+    * This method is mainly used for updating rules in FlinkRuleSetProgram for 
existed
+    * FlinkChainedPrograms instance.
+    */
+  def getFlinkRuleSetProgram(name: String): Option[FlinkRuleSetProgram[OC]] = {
+    get(name).getOrElse(None) match {
+      case p: FlinkRuleSetProgram[OC] => Some(p)
+      case _ => None
+    }
+  }
+
+  /**
+    * Appends the specified program to the end of program collection.
+    *
+    * @return false if program collection contains the specified program; 
otherwise true.
+    */
+  def addLast(name: String, program: FlinkOptimizeProgram[OC]): Boolean = {
+    Preconditions.checkNotNull(name)
+    Preconditions.checkNotNull(program)
+
+    if (programNames.contains(name)) {
+      false
+    } else {
+      programNames.add(name)
+      programMap.put(name, program)
+      true
+    }
+  }
+
+  /**
+    * Inserts the specified program to the beginning of program collection.
+    *
+    * @return false if program collection contains the specified program; 
otherwise true.
+    */
+  def addFirst(name: String, program: FlinkOptimizeProgram[OC]): Boolean = {
+    Preconditions.checkNotNull(name)
+    Preconditions.checkNotNull(program)
+
+    if (programNames.contains(name)) {
+      false
+    } else {
+      programNames.add(0, name)
+      programMap.put(name, program)
+      true
+    }
+  }
+
+  /**
+    * Inserts the specified program before `nameOfBefore`.
+    *
+    * @return false if program collection contains the specified program or
+    *         does not contain `nameOfBefore`; otherwise true.
+    */
+  def addBefore(nameOfBefore: String, name: String, program: 
FlinkOptimizeProgram[OC]): Boolean = {
+    Preconditions.checkNotNull(nameOfBefore)
+    Preconditions.checkNotNull(name)
+    Preconditions.checkNotNull(program)
+
+    if (programNames.contains(name) || !programNames.contains(nameOfBefore)) {
+      false
+    } else if (programNames.isEmpty) {
+      addLast(name, program)
+    } else {
+      val index = programNames.indexOf(nameOfBefore)
+      programNames.add(index, name)
+      programMap.put(name, program)
+      true
+    }
+  }
+
+  /**
+    * Removes program associated with the given name from program collection.
+    *
+    * @return The removed program associated with the given name. If not 
found, return [[None]].
+    */
+  def remove(name: String): Option[FlinkOptimizeProgram[OC]] = {
+    programNames.remove(name)
+    Option.apply(programMap.remove(name))
+  }
+
+  /**
+    * Returns program names with chained order.
+    */
+  def getProgramNames: util.List[String] = new 
util.ArrayList[String](programNames)
+
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkGroupProgram.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkGroupProgram.scala
new file mode 100644
index 0000000..9360e31
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkGroupProgram.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.optimize.program
+
+import org.apache.flink.table.util.Logging
+import org.apache.flink.util.Preconditions
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.RelNode
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/**
+  * A FlinkOptimizeProgram that contains a sequence of 
sub-[[FlinkOptimizeProgram]]s as a group.
+  * Programs in the group will be executed in sequence,
+  * and the group will be executed `iterations` times.
+  *
+  * @tparam OC OptimizeContext
+  */
+class FlinkGroupProgram[OC <: FlinkOptimizeContext] extends 
FlinkOptimizeProgram[OC] with Logging {
+
+  /**
+    * Sub-programs in this program.
+    */
+  private val programs = new util.ArrayList[(FlinkOptimizeProgram[OC], 
String)]()
+
+  /**
+    * Repeat execution times for sub-programs as a group.
+    */
+  private var iterations = 1
+
+  override def optimize(root: RelNode, context: OC): RelNode = {
+    if (programs.isEmpty) {
+      return root
+    }
+
+    (0 until iterations).foldLeft(root) {
+      case (input, i) =>
+        if (LOG.isDebugEnabled) {
+          LOG.debug(s"iteration: ${i + 1}")
+        }
+        programs.foldLeft(input) {
+          case (currentInput, (program, description)) =>
+            val start = System.currentTimeMillis()
+            val result = program.optimize(currentInput, context)
+            val end = System.currentTimeMillis()
+
+            if (LOG.isDebugEnabled) {
+              LOG.debug(s"optimize $description cost ${end - start} ms.\n" +
+                s"optimize result:\n ${RelOptUtil.toString(result)}")
+            }
+            result
+        }
+    }
+  }
+
+  def addProgram(program: FlinkOptimizeProgram[OC], description: String = ""): 
Unit = {
+    Preconditions.checkNotNull(program)
+    val desc = if (description != null) description else ""
+    programs.add((program, desc))
+  }
+
+  def setIterations(iterations: Int): Unit = {
+    Preconditions.checkArgument(iterations > 0)
+    this.iterations = iterations
+  }
+}
+
+class FlinkGroupProgramBuilder[OC <: FlinkOptimizeContext] {
+  private val groupProgram = new FlinkGroupProgram[OC]
+
+  def addProgram(
+      program: FlinkOptimizeProgram[OC], description: String = ""): 
FlinkGroupProgramBuilder[OC] = {
+    groupProgram.addProgram(program, description)
+    this
+  }
+
+  def setIterations(iterations: Int): FlinkGroupProgramBuilder[OC] = {
+    groupProgram.setIterations(iterations)
+    this
+  }
+
+  def build(): FlinkGroupProgram[OC] = groupProgram
+
+}
+
+object FlinkGroupProgramBuilder {
+  def newBuilder[OC <: FlinkOptimizeContext] = new FlinkGroupProgramBuilder[OC]
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkHepProgram.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkHepProgram.scala
new file mode 100644
index 0000000..10d90f4
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkHepProgram.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.optimize.program
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.util.Preconditions
+
+import org.apache.calcite.plan.RelTrait
+import org.apache.calcite.plan.hep.{HepPlanner, HepProgram}
+import org.apache.calcite.rel.RelNode
+
+/**
+  * A FlinkOptimizeProgram that runs with [[HepPlanner]].
+  *
+  * <p>In most case, [[FlinkHepRuleSetProgram]] could meet our requirements.
+  * Otherwise we could choose this program for some advanced features,
+  * and use [[org.apache.calcite.plan.hep.HepProgramBuilder]] to create 
[[HepProgram]].
+  *
+  * @tparam OC OptimizeContext
+  */
+class FlinkHepProgram[OC <: FlinkOptimizeContext] extends 
FlinkOptimizeProgram[OC] {
+
+  /**
+    * [[HepProgram]] instance for [[HepPlanner]],
+    * this must not be None when doing optimize.
+    */
+  private var hepProgram: Option[HepProgram] = None
+
+  /**
+    * Requested root traits, it's an optional item.
+    */
+  private var requestedRootTraits: Option[Array[RelTrait]] = None
+
+  override def optimize(root: RelNode, context: OC): RelNode = {
+    if (hepProgram.isEmpty) {
+      throw new TableException("hepProgram should not be None in 
FlinkHepProgram")
+    }
+
+    val planner = new HepPlanner(hepProgram.get, context)
+    planner.setRoot(root)
+
+    if (requestedRootTraits.isDefined) {
+      val targetTraitSet = root.getTraitSet.plusAll(requestedRootTraits.get)
+      if (!root.getTraitSet.equals(targetTraitSet)) {
+        planner.changeTraits(root, targetTraitSet.simplify)
+      }
+    }
+
+    planner.findBestExp
+  }
+
+  /**
+    * Sets hep program instance.
+    */
+  def setHepProgram(hepProgram: HepProgram): Unit = {
+    Preconditions.checkNotNull(hepProgram)
+    this.hepProgram = Some(hepProgram)
+  }
+
+  /**
+    * Sets requested root traits.
+    */
+  def setRequestedRootTraits(relTraits: Array[RelTrait]): Unit = {
+    requestedRootTraits = Option.apply(relTraits)
+  }
+
+}
+
+object FlinkHepProgram {
+
+  def apply[OC <: FlinkOptimizeContext](
+      hepProgram: HepProgram,
+      requestedRootTraits: Option[Array[RelTrait]] = None): 
FlinkHepProgram[OC] = {
+
+    val flinkHepProgram = new FlinkHepProgram[OC]()
+    flinkHepProgram.setHepProgram(hepProgram)
+    if (requestedRootTraits.isDefined) {
+      flinkHepProgram.setRequestedRootTraits(requestedRootTraits.get)
+    }
+    flinkHepProgram
+  }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkHepRuleSetProgram.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkHepRuleSetProgram.scala
new file mode 100644
index 0000000..a5024a7
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkHepRuleSetProgram.scala
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.optimize.program
+
+import 
org.apache.flink.table.plan.optimize.program.HEP_RULES_EXECUTION_TYPE.HEP_RULES_EXECUTION_TYPE
+import org.apache.flink.util.Preconditions
+
+import org.apache.calcite.plan.RelTrait
+import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, 
HepProgramBuilder}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.tools.RuleSet
+
+import scala.collection.JavaConversions._
+
+/**
+  * A FlinkRuleSetProgram that runs with [[HepPlanner]].
+  *
+  * <p>In most case this program could meet our requirements, otherwise we 
could choose
+  * [[FlinkHepProgram]] for some advanced features.
+  *
+  * <p>Currently, default hep execution type is 
[[HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE]].
+  * (Please refer to [[HEP_RULES_EXECUTION_TYPE]] for more info about 
execution types)
+  *
+  * @tparam OC OptimizeContext
+  */
+class FlinkHepRuleSetProgram[OC <: FlinkOptimizeContext] extends 
FlinkRuleSetProgram[OC] {
+
+  /**
+    * The order of graph traversal when looking for rule matches,
+    * default match order is ARBITRARY.
+    */
+  private var matchOrder: HepMatchOrder = HepMatchOrder.ARBITRARY
+
+  /**
+    * The limit of pattern matches for this program,
+    * default match limit is Integer.MAX_VALUE.
+    */
+  private var matchLimit: Int = Integer.MAX_VALUE
+
+  /**
+    * Hep rule execution type. This is a required item,
+    * default execution type is RULE_SEQUENCE.
+    */
+  private var executionType: HEP_RULES_EXECUTION_TYPE = 
HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE
+
+  /**
+    * Requested root traits, this is an optional item.
+    */
+  private var requestedRootTraits: Option[Array[RelTrait]] = None
+
+  override def optimize(input: RelNode, context: OC): RelNode = {
+    if (rules.isEmpty) {
+      return input
+    }
+
+    // build HepProgram
+    val builder = new HepProgramBuilder
+    builder.addMatchOrder(matchOrder)
+    builder.addMatchLimit(matchLimit)
+    executionType match {
+      case HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE =>
+        rules.foreach(builder.addRuleInstance)
+      case HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION =>
+        builder.addRuleCollection(rules)
+      case _ =>
+        throw new RuntimeException(s"Unsupported HEP_RULES_EXECUTION_TYPE: 
$executionType")
+    }
+
+    // optimize with HepProgram
+    val flinkHepProgram = FlinkHepProgram[OC](builder.build(), 
requestedRootTraits)
+    flinkHepProgram.optimize(input, context)
+  }
+
+  /**
+    * Sets rules match order.
+    */
+  def setHepMatchOrder(matchOrder: HepMatchOrder): Unit = {
+    this.matchOrder = Preconditions.checkNotNull(matchOrder)
+  }
+
+  /**
+    * Sets the limit of pattern matches.
+    */
+  def setMatchLimit(matchLimit: Int): Unit = {
+    Preconditions.checkArgument(matchLimit > 0)
+    this.matchLimit = matchLimit
+  }
+
+  /**
+    * Sets hep rule execution type.
+    */
+  def setHepRulesExecutionType(executionType: HEP_RULES_EXECUTION_TYPE): Unit 
= {
+    this.executionType = Preconditions.checkNotNull(executionType)
+  }
+
+  /**
+    * Sets requested root traits.
+    */
+  def setRequestedRootTraits(relTraits: Array[RelTrait]): Unit = {
+    requestedRootTraits = Option.apply(relTraits)
+  }
+}
+
+/**
+  * An enumeration of hep rule execution type, to tell the [[HepPlanner]]
+  * how exactly execute the rules.
+  */
+object HEP_RULES_EXECUTION_TYPE extends Enumeration {
+  type HEP_RULES_EXECUTION_TYPE = Value
+
+  /**
+    * Rules in RULE_SEQUENCE type are executed with RuleInstance.
+    * RuleInstance is an instruction that matches a specific rule, each rule 
in the rule
+    * collection is associated with one RuleInstance. Each RuleInstance will 
be executed
+    * only once according to the order defined by the rule collection, but a 
rule may be applied
+    * more than once. If arbitrary order is needed, use RULE_COLLECTION 
instead.
+    *
+    * Please refer to [[HepProgramBuilder#addRuleInstance]] for more info 
about RuleInstance.
+    */
+  val RULE_SEQUENCE: HEP_RULES_EXECUTION_TYPE.Value = Value
+
+  /**
+    * Rules in RULE_COLLECTION type are executed with RuleCollection.
+    * RuleCollection is an instruction that matches any rules in a given 
collection.
+    * The order in which the rules within a collection will be attempted is 
arbitrary,
+    * so if more control is needed, use RULE_SEQUENCE instead.
+    *
+    * Please refer to [[HepProgramBuilder#addRuleCollection]] for more info 
about RuleCollection.
+    */
+  val RULE_COLLECTION: HEP_RULES_EXECUTION_TYPE.Value = Value
+}
+
+class FlinkHepRuleSetProgramBuilder[OC <: FlinkOptimizeContext] {
+  private val hepRuleSetProgram = new FlinkHepRuleSetProgram[OC]
+
+  def setHepRulesExecutionType(
+      executionType: HEP_RULES_EXECUTION_TYPE): 
FlinkHepRuleSetProgramBuilder[OC] = {
+    hepRuleSetProgram.setHepRulesExecutionType(executionType)
+    this
+  }
+
+  /**
+    * Sets rules match order.
+    */
+  def setHepMatchOrder(matchOrder: HepMatchOrder): 
FlinkHepRuleSetProgramBuilder[OC] = {
+    hepRuleSetProgram.setHepMatchOrder(matchOrder)
+    this
+  }
+
+  /**
+    * Sets the limit of pattern matches.
+    */
+  def setMatchLimit(matchLimit: Int): FlinkHepRuleSetProgramBuilder[OC] = {
+    hepRuleSetProgram.setMatchLimit(matchLimit)
+    this
+  }
+
+  /**
+    * Adds rules for this program.
+    */
+  def add(ruleSet: RuleSet): FlinkHepRuleSetProgramBuilder[OC] = {
+    hepRuleSetProgram.add(ruleSet)
+    this
+  }
+
+  /**
+    * Sets requested root traits.
+    */
+  def setRequestedRootTraits(relTraits: Array[RelTrait]): 
FlinkHepRuleSetProgramBuilder[OC] = {
+    hepRuleSetProgram.setRequestedRootTraits(relTraits)
+    this
+  }
+
+  def build(): FlinkHepRuleSetProgram[OC] = hepRuleSetProgram
+
+}
+
+object FlinkHepRuleSetProgramBuilder {
+  def newBuilder[OC <: FlinkOptimizeContext] = new 
FlinkHepRuleSetProgramBuilder[OC]
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkOptimizeContext.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkOptimizeContext.scala
new file mode 100644
index 0000000..4739ae4
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkOptimizeContext.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.optimize.program
+
+import org.apache.calcite.plan.Context
+import org.apache.calcite.plan.volcano.VolcanoPlanner
+
+/**
+  * A FlinkOptimizeContext allows to obtain table environment information when 
optimizing.
+  */
+trait FlinkOptimizeContext extends Context {
+
+  /**
+    * Gets [[VolcanoPlanner]] instance defined in 
[[org.apache.flink.table.api.TableEnvironment]].
+    */
+  def getVolcanoPlanner: VolcanoPlanner
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkOptimizeProgram.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkOptimizeProgram.scala
new file mode 100644
index 0000000..108bb3d
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkOptimizeProgram.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.optimize.program
+
+import org.apache.calcite.rel.RelNode
+
+/**
+  * Likes [[org.apache.calcite.tools.Program]], FlinkOptimizeProgram 
transforms a relational
+  * expression into another relational expression.
+  *
+  * @tparam OC OptimizeContext
+  */
+trait FlinkOptimizeProgram[OC <: FlinkOptimizeContext] {
+
+  /**
+    * Transforms a relational expression into another relational expression.
+    */
+  def optimize(root: RelNode, context: OC): RelNode
+
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkRuleSetProgram.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkRuleSetProgram.scala
new file mode 100644
index 0000000..87a52eb
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkRuleSetProgram.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.optimize.program
+
+import org.apache.flink.util.Preconditions
+
+import org.apache.calcite.plan.RelOptRule
+import org.apache.calcite.tools.RuleSet
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/**
+  * A FlinkOptimizeProgram that transforms a relational expression into
+  * another relational expression with [[RuleSet]].
+  */
+abstract class FlinkRuleSetProgram[OC <: FlinkOptimizeContext] extends 
FlinkOptimizeProgram[OC] {
+
+  /**
+    * All [[RelOptRule]]s for optimizing associated with this program.
+    */
+  protected val rules: util.List[RelOptRule] = new util.ArrayList[RelOptRule]()
+
+  /**
+    * Adds specified rules to this program.
+    */
+  def add(ruleSet: RuleSet): Unit = {
+    Preconditions.checkNotNull(ruleSet)
+    ruleSet.foreach { rule =>
+      if (!contains(rule)) {
+        rules.add(rule)
+      }
+    }
+  }
+
+  /**
+    * Removes specified rules from this program.
+    */
+  def remove(ruleSet: RuleSet): Unit = {
+    Preconditions.checkNotNull(ruleSet)
+    ruleSet.foreach(rules.remove)
+  }
+
+  /**
+    * Removes all rules from this program first, and then adds specified rules 
to this program.
+    */
+  def replaceAll(ruleSet: RuleSet): Unit = {
+    Preconditions.checkNotNull(ruleSet)
+    rules.clear()
+    ruleSet.foreach(rules.add)
+  }
+
+  /**
+    * Checks whether this program contains the specified rule.
+    */
+  def contains(rule: RelOptRule): Boolean = {
+    Preconditions.checkNotNull(rule)
+    rules.contains(rule)
+  }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkVolcanoProgram.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkVolcanoProgram.scala
new file mode 100644
index 0000000..7c85c5e
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkVolcanoProgram.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.optimize.program
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.util.Preconditions
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
+import org.apache.calcite.plan.{RelOptUtil, RelTrait}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.tools.{Programs, RuleSet}
+
+/**
+  * A FlinkRuleSetProgram that runs with 
[[org.apache.calcite.plan.volcano.VolcanoPlanner]].
+  *
+  * @tparam OC OptimizeContext
+  */
+class FlinkVolcanoProgram[OC <: FlinkOptimizeContext] extends 
FlinkRuleSetProgram[OC] {
+
+  /**
+    * Required output traits, this must not be None when doing optimize.
+    */
+  protected var requiredOutputTraits: Option[Array[RelTrait]] = None
+
+  override def optimize(root: RelNode, context: OC): RelNode = {
+    if (rules.isEmpty) {
+      return root
+    }
+
+    if (requiredOutputTraits.isEmpty) {
+      throw new TableException("Required output traits should not be None in 
FlinkVolcanoProgram")
+    }
+
+    val targetTraits = 
root.getTraitSet.plusAll(requiredOutputTraits.get).simplify()
+    // reuse VolcanoPlanner instance defined in context
+    val planner = Preconditions.checkNotNull(context.getVolcanoPlanner)
+    val optProgram = Programs.ofRules(rules)
+
+    try {
+      optProgram.run(
+        planner,
+        root,
+        targetTraits,
+        ImmutableList.of(),
+        ImmutableList.of())
+    } catch {
+      case e: CannotPlanException =>
+        throw new TableException(
+          s"Cannot generate a valid execution plan for the given query: \n\n" +
+            s"${RelOptUtil.toString(root)}\n" +
+            s"This exception indicates that the query uses an unsupported SQL 
feature.\n" +
+            s"Please check the documentation for the set of currently 
supported SQL features.")
+      case t: TableException =>
+        throw new TableException(
+          s"Cannot generate a valid execution plan for the given query: \n\n" +
+            s"${RelOptUtil.toString(root)}\n" +
+            s"${t.getMessage}\n" +
+            s"Please check the documentation for the set of currently 
supported SQL features.")
+      case a: AssertionError =>
+        throw new AssertionError(s"Sql optimization: Assertion error: 
${a.getMessage}", a)
+      case r: RuntimeException if r.getCause.isInstanceOf[TableException] =>
+        throw new TableException(
+          s"Sql optimization: Cannot generate a valid execution plan for the 
given query: \n\n" +
+            s"${RelOptUtil.toString(root)}\n" +
+            s"${r.getMessage}\n" +
+            s"Please check the documentation for the set of currently 
supported SQL features.")
+    }
+  }
+
+  /**
+    * Sets required output traits.
+    */
+  def setRequiredOutputTraits(relTraits: Array[RelTrait]): Unit = {
+    Preconditions.checkNotNull(relTraits)
+    requiredOutputTraits = Some(relTraits)
+  }
+
+}
+
+class FlinkVolcanoProgramBuilder[OC <: FlinkOptimizeContext] {
+  private val volcanoProgram = new FlinkVolcanoProgram[OC]
+
+  /**
+    * Adds rules for this program.
+    */
+  def add(ruleSet: RuleSet): FlinkVolcanoProgramBuilder[OC] = {
+    volcanoProgram.add(ruleSet)
+    this
+  }
+
+  /**
+    * Sets required output traits.
+    */
+  def setRequiredOutputTraits(relTraits: Array[RelTrait]): 
FlinkVolcanoProgramBuilder[OC] = {
+    volcanoProgram.setRequiredOutputTraits(relTraits)
+    this
+  }
+
+  def build(): FlinkVolcanoProgram[OC] = volcanoProgram
+
+}
+
+object FlinkVolcanoProgramBuilder {
+  def newBuilder[OC <: FlinkOptimizeContext] = new 
FlinkVolcanoProgramBuilder[OC]
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/StreamOptimizeContext.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/StreamOptimizeContext.scala
new file mode 100644
index 0000000..34fdafb
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/StreamOptimizeContext.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.optimize.program
+
+/**
+  * A OptimizeContext allows to obtain stream table environment information 
when optimizing.
+  */
+trait StreamOptimizeContext extends FlinkOptimizeContext {
+
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/util/Logging.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/util/Logging.scala
new file mode 100644
index 0000000..b6be99e6
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/util/Logging.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.util
+
+import org.slf4j.{Logger, LoggerFactory}
+
+/**
+  * Helper class to ensure the logger is never serialized.
+  */
+trait Logging {
+  @transient lazy val LOG: Logger = LoggerFactory.getLogger(getClass)
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/optimize/program/FlinkChainedProgramTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/optimize/program/FlinkChainedProgramTest.scala
new file mode 100644
index 0000000..5ccebc8
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/optimize/program/FlinkChainedProgramTest.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.optimize.program
+
+import org.apache.calcite.plan.Convention
+import org.apache.calcite.plan.hep.{HepMatchOrder, HepProgramBuilder}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.rules._
+import org.apache.calcite.tools.RuleSets
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConversions._
+
+/**
+  * Tests for [[FlinkChainedProgram]].
+  */
+class FlinkChainedProgramTest {
+
+  @Test
+  def testAddGetRemovePrograms(): Unit = {
+    val programs = new FlinkChainedProgram
+    assertTrue(programs.getProgramNames.isEmpty)
+    assertTrue(programs.get("o1").isEmpty)
+
+    // test addFirst
+    val builder = new HepProgramBuilder()
+    builder
+      .addMatchLimit(10)
+      .addMatchOrder(HepMatchOrder.ARBITRARY)
+      .addRuleInstance(SubQueryRemoveRule.FILTER)
+      .addRuleInstance(SubQueryRemoveRule.PROJECT)
+      .addRuleInstance(SubQueryRemoveRule.JOIN)
+      .addMatchLimit(100)
+      .addMatchOrder(HepMatchOrder.BOTTOM_UP)
+      .addRuleCollection(Array(
+        TableScanRule.INSTANCE,
+        ValuesReduceRule.FILTER_INSTANCE
+      ).toList)
+    val program1 = FlinkHepProgram(builder.build())
+    assertTrue(programs.addFirst("o2", program1))
+    assertEquals(List("o2"), programs.getProgramNames.toList)
+    assertTrue(programs.get("o2").isDefined)
+    assertTrue(program1 eq programs.get("o2").get)
+
+    val program2 = FlinkHepRuleSetProgramBuilder.newBuilder
+      .add(RuleSets.ofList(
+        ReduceExpressionsRule.FILTER_INSTANCE,
+        ReduceExpressionsRule.PROJECT_INSTANCE,
+        ReduceExpressionsRule.CALC_INSTANCE,
+        ReduceExpressionsRule.JOIN_INSTANCE
+      )).build()
+    assertTrue(programs.addFirst("o1", program2))
+    assertEquals(List("o1", "o2"), programs.getProgramNames.toList)
+    assertTrue(programs.get("o1").isDefined)
+    assertTrue(program2 eq programs.get("o1").get)
+
+    // test addLast
+    val program3 = FlinkHepRuleSetProgramBuilder.newBuilder
+      .add(RuleSets.ofList(
+        FilterCalcMergeRule.INSTANCE,
+        ProjectCalcMergeRule.INSTANCE,
+        FilterToCalcRule.INSTANCE,
+        ProjectToCalcRule.INSTANCE,
+        CalcMergeRule.INSTANCE))
+      .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
+      .setMatchLimit(10000)
+      .setHepMatchOrder(HepMatchOrder.ARBITRARY)
+      .build()
+    assertTrue(programs.addLast("o4", program3))
+    assertEquals(List("o1", "o2", "o4"), programs.getProgramNames.toList)
+    assertTrue(programs.get("o4").isDefined)
+    assertTrue(program3 eq programs.get("o4").get)
+
+    // test addBefore
+    val TEST = new Convention.Impl("TEST", classOf[RelNode])
+    val program4 = FlinkVolcanoProgramBuilder.newBuilder
+      .add(RuleSets.ofList(
+        FilterJoinRule.FILTER_ON_JOIN,
+        FilterJoinRule.JOIN))
+      .setRequiredOutputTraits(Array(TEST))
+      .build()
+    assertTrue(programs.addBefore("o4", "o3", program4))
+    assertEquals(List("o1", "o2", "o3", "o4"), programs.getProgramNames.toList)
+    assertTrue(programs.get("o3").isDefined)
+    assertTrue(program4 eq programs.get("o3").get)
+
+    // test remove
+    val p2 = programs.remove("o2")
+    assertTrue(p2.isDefined)
+    assertTrue(p2.get eq program1)
+    assertEquals(List("o1", "o3", "o4"), programs.getProgramNames.toList)
+    assertTrue(programs.remove("o0").isEmpty)
+    assertEquals(List("o1", "o3", "o4"), programs.getProgramNames.toList)
+
+    // program already exists
+    assertFalse(programs.addFirst("o3", program1))
+    assertFalse(programs.addLast("o4", program1))
+    assertFalse(programs.addBefore("o0", "o4", program1))
+    assertEquals(List("o1", "o3", "o4"), programs.getProgramNames.toList)
+  }
+
+  @Test
+  def testGetFlinkRuleSetProgram(): Unit = {
+    val programs = new FlinkChainedProgram
+    assertTrue(programs.getProgramNames.isEmpty)
+
+    val program1 = FlinkHepRuleSetProgramBuilder.newBuilder
+      .add(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
+      .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+      .build()
+    programs.addFirst("o1", program1)
+    assertTrue(programs.get("o1").isDefined)
+    assertTrue(program1 eq programs.get("o1").get)
+
+    val builder = new HepProgramBuilder()
+    builder
+      .addMatchLimit(10)
+      .addRuleInstance(SubQueryRemoveRule.FILTER)
+      .addRuleInstance(SubQueryRemoveRule.JOIN)
+      .addMatchOrder(HepMatchOrder.BOTTOM_UP)
+    val program2 = FlinkHepProgram(builder.build())
+    programs.addLast("o2", program2)
+    assertTrue(programs.get("o2").isDefined)
+    assertTrue(program2 eq programs.get("o2").get)
+    assertTrue(programs.getFlinkRuleSetProgram("o2").isEmpty)
+
+    assertTrue(programs.getFlinkRuleSetProgram("o3").isEmpty)
+
+    val p1 = programs.getFlinkRuleSetProgram("o1")
+    assertTrue(p1.isDefined)
+    p1.get.add(RuleSets.ofList(SubQueryRemoveRule.PROJECT))
+    assertTrue(p1.get eq programs.getFlinkRuleSetProgram("o1").get)
+  }
+
+  @Test(expected = classOf[NullPointerException])
+  def testAddNullProgram(): Unit = {
+    val programs = new FlinkChainedProgram[BatchOptimizeContext]
+    programs.addLast("o1", null)
+  }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/optimize/program/FlinkHepRuleSetProgramTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/optimize/program/FlinkHepRuleSetProgramTest.scala
new file mode 100644
index 0000000..465edfb
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/optimize/program/FlinkHepRuleSetProgramTest.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.optimize.program
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.rel.rules._
+import org.apache.calcite.tools.RuleSets
+import org.junit.Assert.{assertFalse, assertTrue}
+import org.junit.Test
+
+/**
+  * Tests for [[FlinkHepRuleSetProgram]].
+  */
+class FlinkHepRuleSetProgramTest {
+
+  @Test
+  def testBuildFlinkHepRuleSetProgram(): Unit = {
+    FlinkHepRuleSetProgramBuilder.newBuilder
+      .add(RuleSets.ofList(
+        ReduceExpressionsRule.FILTER_INSTANCE,
+        ReduceExpressionsRule.PROJECT_INSTANCE,
+        ReduceExpressionsRule.CALC_INSTANCE,
+        ReduceExpressionsRule.JOIN_INSTANCE
+      ))
+      .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+      .setMatchLimit(10)
+      .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+      .build()
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testMatchLimitLessThan1(): Unit = {
+    FlinkHepRuleSetProgramBuilder.newBuilder.setMatchLimit(0)
+  }
+
+  @Test(expected = classOf[NullPointerException])
+  def testNullHepMatchOrder(): Unit = {
+    FlinkHepRuleSetProgramBuilder.newBuilder.setHepMatchOrder(null)
+  }
+
+  @Test(expected = classOf[NullPointerException])
+  def testNullHepRulesExecutionType(): Unit = {
+    FlinkHepRuleSetProgramBuilder.newBuilder.setHepRulesExecutionType(null)
+  }
+
+  @Test
+  def testRuleOperations(): Unit = {
+    val program = FlinkHepRuleSetProgramBuilder.newBuilder
+      .add(RuleSets.ofList(
+        ReduceExpressionsRule.FILTER_INSTANCE,
+        ReduceExpressionsRule.PROJECT_INSTANCE,
+        ReduceExpressionsRule.CALC_INSTANCE,
+        ReduceExpressionsRule.JOIN_INSTANCE
+      )).build()
+
+    assertTrue(program.contains(ReduceExpressionsRule.FILTER_INSTANCE))
+    assertTrue(program.contains(ReduceExpressionsRule.PROJECT_INSTANCE))
+    assertTrue(program.contains(ReduceExpressionsRule.CALC_INSTANCE))
+    assertTrue(program.contains(ReduceExpressionsRule.JOIN_INSTANCE))
+    assertFalse(program.contains(SubQueryRemoveRule.FILTER))
+
+    program.remove(RuleSets.ofList(
+      ReduceExpressionsRule.FILTER_INSTANCE,
+      ReduceExpressionsRule.PROJECT_INSTANCE))
+    assertFalse(program.contains(ReduceExpressionsRule.FILTER_INSTANCE))
+    assertFalse(program.contains(ReduceExpressionsRule.PROJECT_INSTANCE))
+    assertTrue(program.contains(ReduceExpressionsRule.CALC_INSTANCE))
+    assertTrue(program.contains(ReduceExpressionsRule.JOIN_INSTANCE))
+
+    program.replaceAll(RuleSets.ofList(SubQueryRemoveRule.FILTER))
+    assertFalse(program.contains(ReduceExpressionsRule.CALC_INSTANCE))
+    assertFalse(program.contains(ReduceExpressionsRule.JOIN_INSTANCE))
+    assertTrue(program.contains(SubQueryRemoveRule.FILTER))
+
+    program.add(RuleSets.ofList(
+      SubQueryRemoveRule.PROJECT,
+      SubQueryRemoveRule.JOIN))
+    assertTrue(program.contains(SubQueryRemoveRule.FILTER))
+    assertTrue(program.contains(SubQueryRemoveRule.PROJECT))
+    assertTrue(program.contains(SubQueryRemoveRule.JOIN))
+  }
+
+  @Test(expected = classOf[NullPointerException])
+  def testNullRuleSets(): Unit = {
+    FlinkHepRuleSetProgramBuilder.newBuilder.add(null)
+  }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/optimize/program/FlinkVolcanoProgramTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/optimize/program/FlinkVolcanoProgramTest.scala
new file mode 100644
index 0000000..31bf32b
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/optimize/program/FlinkVolcanoProgramTest.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.optimize.program
+
+import org.apache.calcite.plan.Convention
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.rules._
+import org.apache.calcite.tools.RuleSets
+import org.junit.Test
+
+/**
+  * Tests for [[FlinkVolcanoProgramTest]].
+  */
+class FlinkVolcanoProgramTest {
+
+  @Test
+  def testBuildFlinkVolcanoProgram(): Unit = {
+    val TEST = new Convention.Impl("TEST", classOf[RelNode])
+    FlinkVolcanoProgramBuilder.newBuilder
+      .add(RuleSets.ofList(
+        ReduceExpressionsRule.FILTER_INSTANCE,
+        ReduceExpressionsRule.PROJECT_INSTANCE,
+        ReduceExpressionsRule.CALC_INSTANCE,
+        ReduceExpressionsRule.JOIN_INSTANCE
+      ))
+      .setRequiredOutputTraits(Array(TEST))
+      .build()
+  }
+
+  @Test(expected = classOf[NullPointerException])
+  def testNullRequiredOutputTraits(): Unit = {
+    FlinkVolcanoProgramBuilder.newBuilder.setRequiredOutputTraits(null)
+  }
+
+}

Reply via email to