[
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15966513#comment-15966513
]
ASF GitHub Bot commented on FLINK-6090:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3696#discussion_r110392466
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala
---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.RelNode
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+/**
+ * Collection of retraction rules that apply various transformations on
DataStreamRel trees.
+ * Currently, there are three transformations: InitProcessRule,
NeedToRetractProcessRule and
+ * AccModeProcessRule. Note: these rules must be called in order
(InitProcessRule ->
+ * NeedToRetractProcessRule -> AccModeProcessRule).
+ */
+object DataStreamRetractionRule {
+
+ /**
+ * Singleton rule that init retraction trait inside a [[DataStreamRel]]
+ */
+ val INIT_INSTANCE = new InitProcessRule()
+
+ /**
+ * Singleton rule that decide needToRetract property inside a
[[DataStreamRel]]
+ */
+ val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
+
+ /**
+ * Singleton rule that decide accMode inside a [[DataStreamRel]]
+ */
+ val ACCMODE_INSTANCE = new AccModeProcessRule()
+
+ /**
+ * Get all child RelNodes of a RelNode
+ * @param topRel The input RelNode
+ * @return All child nodes
+ */
+ def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
+ val topRelInputs = new ListBuffer[RelNode]()
+ topRelInputs.++=(topRel.getInputs.asScala)
+ topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
+ }
+
+ def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
+ val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
+ if (null == retractionTrait) {
+ false
+ } else {
+ retractionTrait.getNeedToRetract
+ }
+ }
+
+
+ /**
+ * Find all needToRetract nodes. A node needs to retract means that
there are downstream
+ * nodes need retraction from it. Currently,
[[DataStreamOverAggregate]] and
+ * [[DataStreamGroupWindowAggregate]] need retraction from upstream
nodes, besides, a
+ * needToRetract node also need retraction from it's upstream nodes.
+ */
+ class NeedToRetractProcessRule extends RelOptRule(
+ operand(
+ classOf[DataStreamRel], none()),
+ "NeedToRetractProcessRule") {
+
+ /**
+ * Return true if bottom RelNode does not contain needToRetract and
top RelNode need
+ * retraction from bottom RelNode. Currently, operators which contain
aggregations need
+ * retraction from upstream nodes, besides, a needToRetract node also
needs retraction from
+ * it's upstream nodes.
+ */
+ def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean
= {
+ val bottomTraits = bottomRel.getTraitSet
+ if(!traitSetContainNeedToRetract(bottomTraits)){
+ topRel match {
+ case _: DataStreamGroupAggregate => true
+ case _: DataStreamGroupWindowAggregate => true
+ case _: DataStreamOverAggregate => true
+ case _ if traitSetContainNeedToRetract(topRel.getTraitSet) =>
true
+ case _ => false
+ }
+ } else {
+ false
+ }
+ }
+
+ /**
+ * Add needToRetract for the input RelNode
+ */
+ def addNeedToRetract(relNode: RelNode): RelNode = {
+ val traitSet = relNode.getTraitSet
+ var retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
+ if (null == retractionTrait) {
+ retractionTrait = new RetractionTrait(true, AccMode.Acc)
+ } else {
+ retractionTrait = new RetractionTrait(true,
retractionTrait.getAccMode)
+ }
+
+ relNode.copy(traitSet.plus(retractionTrait), relNode.getInputs)
+ }
+
+ /**
+ * Returns a new topRel and a needTransform flag for a given topRel
and bottomRels. The new
+ * topRel contains new bottomRels with needToRetract properly marked.
The needTransform flag
+ * will be true if any transformation has been done.
+ *
+ * @param topRel The input top RelNode.
+ * @param bottomRels The input bottom RelNodes.
+ * @return A tuple holding a new top RelNode and a needTransform flag
+ */
+ def needToRetractProcess(
+ topRel: RelNode,
+ bottomRels: ListBuffer[RelNode])
+ : (RelNode, Boolean) = {
+
+ var needTransform = false
+ var i = 0
+ while(i < bottomRels.size) {
+ val bottomRel = bottomRels(i)
+ if(bottomNeedToRetract(topRel, bottomRel)) {
--- End diff --
+space: `if (`
> Add RetractionRule at the stage of decoration
> ---------------------------------------------
>
> Key: FLINK-6090
> URL: https://issues.apache.org/jira/browse/FLINK-6090
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Shaoxuan Wang
> Assignee: Hequn Cheng
>
> Implement optimizer for retraction:
> 1.Add RetractionRule at the stage of decoration,which can derive the
> replace table/append table, NeedRetraction property.
> 2.Match the NeedRetraction and replace table, mark the accumulating mode
>
> When this task is finished, we can turn on retraction for different operators
> according to accumulating mode.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)