[ https://issues.apache.org/jira/browse/FLINK-5624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15867642#comment-15867642 ]
ASF GitHub Bot commented on FLINK-5624: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3252#discussion_r101248097 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala --- @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.rules.datastream + +import java.util.Calendar + +import com.google.common.collect.ImmutableList +import org.apache.calcite.avatica.util.TimeUnitRange +import org.apache.calcite.plan._ +import org.apache.calcite.plan.hep.HepRelVertex +import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject} +import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode} +import org.apache.calcite.sql.fun.SqlFloorFunction +import org.apache.calcite.util.ImmutableBitSet +import org.apache.flink.table.api.scala.Tumble +import org.apache.flink.table.api.{TableException, TumblingWindow, Window} +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.expressions._ +import org.apache.flink.table.functions.EventTimeExtractor +import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate + +import scala.collection.JavaConversions._ + +class LogicalWindowAggregateRule + extends RelOptRule( + LogicalWindowAggregateRule.LOGICAL_WINDOW_PREDICATE, + "LogicalWindowAggregateRule") { + + override def matches(call: RelOptRuleCall): Boolean = { + val agg = call.rel(0).asInstanceOf[LogicalAggregate] + + val distinctAggs = agg.getAggCallList.exists(_.isDistinct) + val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet + + val windowClause = recognizeWindow(agg) + !distinctAggs && !groupSets && !agg.indicator && windowClause.isDefined + } + + /** + * Transform LogicalAggregate with windowing expression to LogicalProject + * + LogicalWindowAggregate + LogicalProject. + * + * The transformation adds an additional LogicalProject at the top to ensure + * that the types are equivalent. + */ + override def onMatch(call: RelOptRuleCall): Unit = { + val agg = call.rel[LogicalAggregate](0) + val project = agg.getInput.asInstanceOf[HepRelVertex].getCurrentRel.asInstanceOf[LogicalProject] + val (windowExprIdx, window) = recognizeWindow(agg).get + val newGroupSet = agg.getGroupSet.except(ImmutableBitSet.of(windowExprIdx)) + + val builder = call.builder() + val rexBuilder = builder.getRexBuilder + val zero = rexBuilder.makeTimestampLiteral(LogicalWindowAggregateRule.TIMESTAMP_ZERO, 3) + + val newAgg = builder + .push(project.getInput) + .project(project.getChildExps.updated(windowExprIdx, zero)) + .aggregate(builder.groupKey( + newGroupSet, + agg.indicator, ImmutableList.of(newGroupSet)), agg.getAggCallList) + .build().asInstanceOf[LogicalAggregate] + + // Create an additional project to conform with types + val transformed = call.builder() + transformed.push(LogicalWindowAggregate.create( + window.toLogicalWindow, + Seq[NamedWindowProperty](), + newAgg)) + .project(List(zero) ++ transformed.fields()) --- End diff -- The `zero` element must be injected at the position of the window attribute in the grouping set. If you change the order of grouping attributes in the SQL query in the `testMultiGroup()` to `GROUP BY id, FLOOR(rowtime() TO HOUR)`, the planning fails. > Support tumbling window on streaming tables in the SQL API > ---------------------------------------------------------- > > Key: FLINK-5624 > URL: https://issues.apache.org/jira/browse/FLINK-5624 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL > Reporter: Haohui Mai > Assignee: Haohui Mai > > This is a follow up of FLINK-4691. > FLINK-4691 adds supports for group-windows for streaming tables. This jira > proposes to expose the functionality in the SQL layer via the {{GROUP BY}} > clauses, as described in > http://calcite.apache.org/docs/stream.html#tumbling-windows. -- This message was sent by Atlassian JIRA (v6.3.15#6346)