[ 
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15966521#comment-15966521
 ] 

ASF GitHub Bot commented on FLINK-6090:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3696#discussion_r110392776
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala
 ---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.hep.HepRelVertex
    +import org.apache.calcite.rel.RelNode
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +/**
    +  * Collection of retraction rules that apply various transformations on 
DataStreamRel trees.
    +  * Currently, there are three transformations: InitProcessRule, 
NeedToRetractProcessRule and
    +  * AccModeProcessRule. Note: these rules must be called in order 
(InitProcessRule ->
    +  * NeedToRetractProcessRule -> AccModeProcessRule).
    +  */
    +object DataStreamRetractionRule {
    +
    +  /**
    +    * Singleton rule that init retraction trait inside a [[DataStreamRel]]
    +    */
    +  val INIT_INSTANCE = new InitProcessRule()
    +
    +  /**
    +    * Singleton rule that decide needToRetract property inside a 
[[DataStreamRel]]
    +    */
    +  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
    +
    +  /**
    +    * Singleton rule that decide accMode inside a [[DataStreamRel]]
    +    */
    +  val ACCMODE_INSTANCE = new AccModeProcessRule()
    +
    +  /**
    +    * Get all child RelNodes of a RelNode
    +    * @param topRel The input RelNode
    +    * @return All child nodes
    +    */
    +  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
    +    val topRelInputs = new ListBuffer[RelNode]()
    +    topRelInputs.++=(topRel.getInputs.asScala)
    +    topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
    +  }
    +
    +  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
    +    val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +    if (null == retractionTrait) {
    +      false
    +    } else {
    +      retractionTrait.getNeedToRetract
    +    }
    +  }
    +
    +
    +  /**
    +    * Find all needToRetract nodes. A node needs to retract means that 
there are downstream
    +    * nodes need retraction from it. Currently, 
[[DataStreamOverAggregate]] and
    +    * [[DataStreamGroupWindowAggregate]] need retraction from upstream 
nodes, besides, a
    +    * needToRetract node also need retraction from it's upstream nodes.
    +    */
    +  class NeedToRetractProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "NeedToRetractProcessRule") {
    +
    +    /**
    +      * Return true if bottom RelNode does not contain needToRetract and 
top RelNode need
    +      * retraction from bottom RelNode. Currently, operators which contain 
aggregations need
    +      * retraction from upstream nodes, besides, a needToRetract node also 
needs retraction from
    +      * it's upstream nodes.
    +      */
    +    def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean 
= {
    +      val bottomTraits = bottomRel.getTraitSet
    +      if(!traitSetContainNeedToRetract(bottomTraits)){
    +        topRel match {
    +          case _: DataStreamGroupAggregate => true
    +          case _: DataStreamGroupWindowAggregate => true
    +          case _: DataStreamOverAggregate => true
    +          case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => 
true
    +          case _ => false
    +        }
    +      } else {
    +        false
    +      }
    +    }
    +
    +    /**
    +      * Add needToRetract for the input RelNode
    +      */
    +    def addNeedToRetract(relNode: RelNode): RelNode = {
    +      val traitSet = relNode.getTraitSet
    +      var retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        retractionTrait = new RetractionTrait(true, AccMode.Acc)
    +      } else {
    +        retractionTrait = new RetractionTrait(true, 
retractionTrait.getAccMode)
    +      }
    +
    +      relNode.copy(traitSet.plus(retractionTrait), relNode.getInputs)
    +    }
    +
    +    /**
    +      * Returns a new topRel and a needTransform flag for a given topRel 
and bottomRels. The new
    +      * topRel contains new bottomRels with needToRetract properly marked. 
The needTransform flag
    +      * will be true if any transformation has been done.
    +      *
    +      * @param topRel The input top RelNode.
    +      * @param bottomRels The input bottom RelNodes.
    +      * @return A tuple holding a new top RelNode and a needTransform flag
    +      */
    +    def needToRetractProcess(
    +        topRel: RelNode,
    +        bottomRels: ListBuffer[RelNode])
    +    : (RelNode, Boolean) = {
    +
    +      var needTransform = false
    +      var i = 0
    +      while(i < bottomRels.size) {
    +        val bottomRel = bottomRels(i)
    +        if(bottomNeedToRetract(topRel, bottomRel)) {
    +          needTransform = true
    +          bottomRels(i) = addNeedToRetract(bottomRel)
    --- End diff --
    
    Scala discourages the use of mutable collections. Hence, I think we should 
not modify the parameter list here. Instead, we should create a new list and 
create the new top rel from that. For example like this:
    
    ```
    val transformedBottom = for (b <- bottomRels) yield {
      if (bottomNeedToRetract(topRel, b)) {
        needTransform = true
        addNeedToRetract(b)
      } else {
        b
      }
    }
    ```


> Add RetractionRule at the stage of decoration
> ---------------------------------------------
>
>                 Key: FLINK-6090
>                 URL: https://issues.apache.org/jira/browse/FLINK-6090
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Shaoxuan Wang
>            Assignee: Hequn Cheng
>
> Implement optimizer for retraction:
>   1.Add RetractionRule at the stage of decoration,which can derive the 
> replace table/append table, NeedRetraction property.
>   2.Match the NeedRetraction and replace table, mark the accumulating mode
>  
> When this task is finished, we can turn on retraction for different operators 
> according to accumulating mode.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to