[
https://issues.apache.org/jira/browse/FLINK-5624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15867642#comment-15867642
]
ASF GitHub Bot commented on FLINK-5624:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3252#discussion_r101248097
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
---
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.datastream
+
+import java.util.Calendar
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.avatica.util.TimeUnitRange
+import org.apache.calcite.plan._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject}
+import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode}
+import org.apache.calcite.sql.fun.SqlFloorFunction
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.api.scala.Tumble
+import org.apache.flink.table.api.{TableException, TumblingWindow, Window}
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.functions.EventTimeExtractor
+import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
+
+import scala.collection.JavaConversions._
+
+class LogicalWindowAggregateRule
+ extends RelOptRule(
+ LogicalWindowAggregateRule.LOGICAL_WINDOW_PREDICATE,
+ "LogicalWindowAggregateRule") {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val agg = call.rel(0).asInstanceOf[LogicalAggregate]
+
+ val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
+ val groupSets = agg.getGroupSets.size() != 1 ||
agg.getGroupSets.get(0) != agg.getGroupSet
+
+ val windowClause = recognizeWindow(agg)
+ !distinctAggs && !groupSets && !agg.indicator && windowClause.isDefined
+ }
+
+ /**
+ * Transform LogicalAggregate with windowing expression to
LogicalProject
+ * + LogicalWindowAggregate + LogicalProject.
+ *
+ * The transformation adds an additional LogicalProject at the top to
ensure
+ * that the types are equivalent.
+ */
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val agg = call.rel[LogicalAggregate](0)
+ val project =
agg.getInput.asInstanceOf[HepRelVertex].getCurrentRel.asInstanceOf[LogicalProject]
+ val (windowExprIdx, window) = recognizeWindow(agg).get
+ val newGroupSet =
agg.getGroupSet.except(ImmutableBitSet.of(windowExprIdx))
+
+ val builder = call.builder()
+ val rexBuilder = builder.getRexBuilder
+ val zero =
rexBuilder.makeTimestampLiteral(LogicalWindowAggregateRule.TIMESTAMP_ZERO, 3)
+
+ val newAgg = builder
+ .push(project.getInput)
+ .project(project.getChildExps.updated(windowExprIdx, zero))
+ .aggregate(builder.groupKey(
+ newGroupSet,
+ agg.indicator, ImmutableList.of(newGroupSet)), agg.getAggCallList)
+ .build().asInstanceOf[LogicalAggregate]
+
+ // Create an additional project to conform with types
+ val transformed = call.builder()
+ transformed.push(LogicalWindowAggregate.create(
+ window.toLogicalWindow,
+ Seq[NamedWindowProperty](),
+ newAgg))
+ .project(List(zero) ++ transformed.fields())
--- End diff --
The `zero` element must be injected at the position of the window attribute
in the grouping set.
If you change the order of grouping attributes in the SQL query in the
`testMultiGroup()` to `GROUP BY id, FLOOR(rowtime() TO HOUR)`, the planning
fails.
> Support tumbling window on streaming tables in the SQL API
> ----------------------------------------------------------
>
> Key: FLINK-5624
> URL: https://issues.apache.org/jira/browse/FLINK-5624
> Project: Flink
> Issue Type: Improvement
> Components: Table API & SQL
> Reporter: Haohui Mai
> Assignee: Haohui Mai
>
> This is a follow up of FLINK-4691.
> FLINK-4691 adds supports for group-windows for streaming tables. This jira
> proposes to expose the functionality in the SQL layer via the {{GROUP BY}}
> clauses, as described in
> http://calcite.apache.org/docs/stream.html#tumbling-windows.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)