[
https://issues.apache.org/jira/browse/FLINK-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15551522#comment-15551522
]
ASF GitHub Bot commented on FLINK-4691:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/2562#discussion_r82144274
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
---
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
TypeInformation}
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.table.expressions.{Expression, Literal}
+import org.apache.flink.api.table.plan.logical._
+import org.apache.flink.api.table.plan.nodes.FlinkAggregate
+import
org.apache.flink.api.table.plan.nodes.datastream.DataStreamAggregate.{createKeyedWindowedStream,
createNonKeyedWindowedStream}
+import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._
+import
org.apache.flink.api.table.runtime.aggregate.{AggregateAllWindowFunction,
AggregateUtil, AggregateWindowFunction}
+import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.api.table.typeutils.{IntervalTypeInfo,
RowTypeInfo, TypeCheckUtils, TypeConverter}
+import org.apache.flink.api.table.{FlinkTypeFactory, Row,
StreamTableEnvironment}
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream,
DataStream, KeyedStream, WindowedStream}
+import
org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction,
WindowFunction}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{Window =>
DataStreamWindow}
+
+import scala.collection.JavaConverters._
+
+class DataStreamAggregate(
+ window: LogicalWindow,
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ inputNode: RelNode,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ rowRelDataType: RelDataType,
+ inputType: RelDataType,
+ grouping: Array[Int])
+ extends SingleRel(cluster, traitSet, inputNode)
+ with FlinkAggregate
+ with DataStreamRel {
+
+ override def deriveRowType() = rowRelDataType
+
+ override def copy(traitSet: RelTraitSet, inputs:
java.util.List[RelNode]): RelNode = {
+ new DataStreamAggregate(
+ window,
+ cluster,
+ traitSet,
+ inputs.get(0),
+ namedAggregates,
+ getRowType,
+ inputType,
+ grouping)
+ }
+
+ override def toString: String = {
+ s"Aggregate(${ if (!grouping.isEmpty) {
+ s"groupBy: (${groupingToString(inputType, grouping)}), "
+ } else {
+ ""
+ }}window: ($window), " +
+ s"select: (${aggregationToString(inputType, grouping, getRowType,
namedAggregates)}))"
+ }
+
+ override def explainTerms(pw: RelWriter): RelWriter = {
+ super.explainTerms(pw)
+ .itemIf("groupBy", groupingToString(inputType, grouping),
!grouping.isEmpty)
+ .item("window", window)
+ .item("select", aggregationToString(inputType, grouping, getRowType,
namedAggregates))
+ }
+
+ override def translateToPlan(
+ tableEnv: StreamTableEnvironment,
+ expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+ val config = tableEnv.getConfig
+
+ val groupingKeys = grouping.indices.toArray
+ // add grouping fields, position keys in the input, and input type
+ val aggregateResult =
AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates,
--- End diff --
Style: break argument list into line-wise args.
> Add group-windows for streaming tables
> ---------------------------------------
>
> Key: FLINK-4691
> URL: https://issues.apache.org/jira/browse/FLINK-4691
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: Timo Walther
>
> Add Tumble, Slide, Session group-windows for streaming tables as described in
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>
> Implementation of group-windows on streaming tables. This includes
> implementing the API of group-windows, the logical validation for
> group-windows, and the definition of the “rowtime” and “systemtime” keywords.
> Group-windows on batch tables won’t be initially supported and will throw an
> exception.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)