[
https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15966528#comment-15966528
]
ASF GitHub Bot commented on FLINK-6090:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3696#discussion_r111244797
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala
---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.RelNode
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+/**
+ * Collection of retraction rules that apply various transformations on
DataStreamRel trees.
+ * Currently, there are three transformations: InitProcessRule,
NeedToRetractProcessRule and
+ * AccModeProcessRule. Note: these rules must be called in order
(InitProcessRule ->
+ * NeedToRetractProcessRule -> AccModeProcessRule).
+ */
+object DataStreamRetractionRule {
+
+ /**
+ * Singleton rule that init retraction trait inside a [[DataStreamRel]]
+ */
+ val INIT_INSTANCE = new InitProcessRule()
+
+ /**
+ * Singleton rule that decide needToRetract property inside a
[[DataStreamRel]]
+ */
+ val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
+
+ /**
+ * Singleton rule that decide accMode inside a [[DataStreamRel]]
+ */
+ val ACCMODE_INSTANCE = new AccModeProcessRule()
+
+ /**
+ * Get all child RelNodes of a RelNode
+ * @param topRel The input RelNode
+ * @return All child nodes
+ */
+ def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
+ val topRelInputs = new ListBuffer[RelNode]()
+ topRelInputs.++=(topRel.getInputs.asScala)
+ topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
+ }
+
+ def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
+ val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
+ if (null == retractionTrait) {
--- End diff --
can be simplified to
`null != retraction && retractionTrait.getNeedToRetract`
> Add RetractionRule at the stage of decoration
> ---------------------------------------------
>
> Key: FLINK-6090
> URL: https://issues.apache.org/jira/browse/FLINK-6090
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Shaoxuan Wang
> Assignee: Hequn Cheng
>
> Implement optimizer for retraction:
> 1.Add RetractionRule at the stage of decoration,which can derive the
> replace table/append table, NeedRetraction property.
> 2.Match the NeedRetraction and replace table, mark the accumulating mode
>
> When this task is finished, we can turn on retraction for different operators
> according to accumulating mode.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)