swuferhong commented on code in PR #23001:
URL: https://github.com/apache/flink/pull/23001#discussion_r1266674440


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala:
##########
@@ -343,4 +369,59 @@ object WindowUtil {
     }
   }
 
+  private def containsNeighbourWindowOperator(
+      agg: FlinkLogicalAggregate,
+      fmq: FlinkRelMetadataQuery): Boolean = {
+
+    def find(rel: RelNode, fmq: FlinkRelMetadataQuery): Unit = {
+      rel match {
+        case rss: RelSubset =>
+          val innerRel = Option.apply(rss.getBest).getOrElse(rss.getOriginal)
+          find(innerRel, fmq)
+
+        case scan: FlinkLogicalTableFunctionScan =>
+          if (WindowUtil.isWindowTableFunctionCall(scan.getCall)) {
+            throw new Util.FoundOne(scan)
+          }
+          find(scan.getInput(0), fmq)
+
+        case aggregate: FlinkLogicalAggregate =>
+          val winProperties = fmq.getRelWindowProperties(aggregate.getInput)
+          val groups = aggregate.getGroupSet
+          // window agg
+          if (WindowUtil.groupingContainsWindowStartEnd(groups, 
winProperties)) {
+            throw new Util.FoundOne(aggregate)

Review Comment:
   Why not return directly?



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala:
##########
@@ -343,4 +369,59 @@ object WindowUtil {
     }
   }
 
+  private def containsNeighbourWindowOperator(
+      agg: FlinkLogicalAggregate,
+      fmq: FlinkRelMetadataQuery): Boolean = {
+
+    def find(rel: RelNode, fmq: FlinkRelMetadataQuery): Unit = {
+      rel match {
+        case rss: RelSubset =>
+          val innerRel = Option.apply(rss.getBest).getOrElse(rss.getOriginal)
+          find(innerRel, fmq)
+
+        case scan: FlinkLogicalTableFunctionScan =>
+          if (WindowUtil.isWindowTableFunctionCall(scan.getCall)) {
+            throw new Util.FoundOne(scan)
+          }
+          find(scan.getInput(0), fmq)
+
+        case aggregate: FlinkLogicalAggregate =>
+          val winProperties = fmq.getRelWindowProperties(aggregate.getInput)
+          val groups = aggregate.getGroupSet
+          // window agg
+          if (WindowUtil.groupingContainsWindowStartEnd(groups, 
winProperties)) {
+            throw new Util.FoundOne(aggregate)
+          }
+          find(aggregate.getInput, fmq)
+
+        case rank: FlinkLogicalRank =>
+          val winProperties = fmq.getRelWindowProperties(rank.getInput)
+          val partitionKey = rank.partitionKey
+          // both window rank & deduplicate
+          if (WindowUtil.groupingContainsWindowStartEnd(partitionKey, 
winProperties)) {
+            throw new Util.FoundOne(rank)
+          }
+          find(rank.getInput, fmq)
+
+        case join: FlinkLogicalJoin =>
+          // window join
+          if (satisfyWindowJoin(join)) {
+            throw new Util.FoundOne(join)
+          }
+        // others joins can not propagate both window_start and window_end 
time attribute, so
+        // further traversal of child nodes is unnecessary

Review Comment:
   The line break before the comment is not correct.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala:
##########
@@ -51,9 +51,7 @@ class StreamPhysicalGroupAggregateRule(config: Config) 
extends ConverterRule(con
 
     // check not window aggregate
     val fmq = FlinkRelMetadataQuery.reuseOrCreate(call.getMetadataQuery)
-    val windowProperties = fmq.getRelWindowProperties(agg.getInput)
-    val grouping = agg.getGroupSet
-    !WindowUtil.groupingContainsWindowStartEnd(grouping, windowProperties)
+    !WindowUtil.isValidWindowAggregate(agg, fmq)

Review Comment:
   Remove input parameter `fmq`, we can get `mq` by 
`agg.getCluster.getMetadataQuery` in method `isValidWindowAggregate`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to