[ 
https://issues.apache.org/jira/browse/FLINK-5624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15867642#comment-15867642
 ] 

ASF GitHub Bot commented on FLINK-5624:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3252#discussion_r101248097
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
 ---
    @@ -0,0 +1,139 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.plan.rules.datastream
    +
    +import java.util.Calendar
    +
    +import com.google.common.collect.ImmutableList
    +import org.apache.calcite.avatica.util.TimeUnitRange
    +import org.apache.calcite.plan._
    +import org.apache.calcite.plan.hep.HepRelVertex
    +import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject}
    +import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode}
    +import org.apache.calcite.sql.fun.SqlFloorFunction
    +import org.apache.calcite.util.ImmutableBitSet
    +import org.apache.flink.table.api.scala.Tumble
    +import org.apache.flink.table.api.{TableException, TumblingWindow, Window}
    +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
    +import org.apache.flink.table.expressions._
    +import org.apache.flink.table.functions.EventTimeExtractor
    +import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
    +
    +import scala.collection.JavaConversions._
    +
    +class LogicalWindowAggregateRule
    +  extends RelOptRule(
    +    LogicalWindowAggregateRule.LOGICAL_WINDOW_PREDICATE,
    +    "LogicalWindowAggregateRule") {
    +
    +  override def matches(call: RelOptRuleCall): Boolean = {
    +    val agg = call.rel(0).asInstanceOf[LogicalAggregate]
    +
    +    val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
    +    val groupSets = agg.getGroupSets.size() != 1 || 
agg.getGroupSets.get(0) != agg.getGroupSet
    +
    +    val windowClause = recognizeWindow(agg)
    +    !distinctAggs && !groupSets && !agg.indicator && windowClause.isDefined
    +  }
    +
    +  /**
    +    * Transform LogicalAggregate with windowing expression to 
LogicalProject
    +    * + LogicalWindowAggregate + LogicalProject.
    +    *
    +    * The transformation adds an additional LogicalProject at the top to 
ensure
    +    * that the types are equivalent.
    +    */
    +  override def onMatch(call: RelOptRuleCall): Unit = {
    +    val agg = call.rel[LogicalAggregate](0)
    +    val project = 
agg.getInput.asInstanceOf[HepRelVertex].getCurrentRel.asInstanceOf[LogicalProject]
    +    val (windowExprIdx, window) = recognizeWindow(agg).get
    +    val newGroupSet = 
agg.getGroupSet.except(ImmutableBitSet.of(windowExprIdx))
    +
    +    val builder = call.builder()
    +    val rexBuilder = builder.getRexBuilder
    +    val zero = 
rexBuilder.makeTimestampLiteral(LogicalWindowAggregateRule.TIMESTAMP_ZERO, 3)
    +
    +    val newAgg = builder
    +      .push(project.getInput)
    +      .project(project.getChildExps.updated(windowExprIdx, zero))
    +      .aggregate(builder.groupKey(
    +        newGroupSet,
    +        agg.indicator, ImmutableList.of(newGroupSet)), agg.getAggCallList)
    +      .build().asInstanceOf[LogicalAggregate]
    +
    +    // Create an additional project to conform with types
    +    val transformed = call.builder()
    +    transformed.push(LogicalWindowAggregate.create(
    +      window.toLogicalWindow,
    +      Seq[NamedWindowProperty](),
    +      newAgg))
    +      .project(List(zero) ++ transformed.fields())
    --- End diff --
    
    The `zero` element must be injected at the position of the window attribute 
in the grouping set. 
    If you change the order of grouping attributes in the SQL query in the 
`testMultiGroup()` to `GROUP BY id, FLOOR(rowtime() TO HOUR)`, the planning 
fails.


> Support tumbling window on streaming tables in the SQL API
> ----------------------------------------------------------
>
>                 Key: FLINK-5624
>                 URL: https://issues.apache.org/jira/browse/FLINK-5624
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>            Reporter: Haohui Mai
>            Assignee: Haohui Mai
>
> This is a follow up of FLINK-4691.
> FLINK-4691 adds supports for group-windows for streaming tables. This jira 
> proposes to expose the functionality in the SQL layer via the {{GROUP BY}} 
> clauses, as described in 
> http://calcite.apache.org/docs/stream.html#tumbling-windows.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to