[
https://issues.apache.org/jira/browse/FLINK-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15551527#comment-15551527
]
ASF GitHub Bot commented on FLINK-4691:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/2562#discussion_r82152829
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
---
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
TypeInformation}
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.table.expressions.{Expression, Literal}
+import org.apache.flink.api.table.plan.logical._
+import org.apache.flink.api.table.plan.nodes.FlinkAggregate
+import
org.apache.flink.api.table.plan.nodes.datastream.DataStreamAggregate.{createKeyedWindowedStream,
createNonKeyedWindowedStream}
+import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._
+import
org.apache.flink.api.table.runtime.aggregate.{AggregateAllWindowFunction,
AggregateUtil, AggregateWindowFunction}
+import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.api.table.typeutils.{IntervalTypeInfo,
RowTypeInfo, TypeCheckUtils, TypeConverter}
+import org.apache.flink.api.table.{FlinkTypeFactory, Row,
StreamTableEnvironment}
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream,
DataStream, KeyedStream, WindowedStream}
+import
org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction,
WindowFunction}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{Window =>
DataStreamWindow}
+
+import scala.collection.JavaConverters._
+
+class DataStreamAggregate(
+ window: LogicalWindow,
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ inputNode: RelNode,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ rowRelDataType: RelDataType,
+ inputType: RelDataType,
+ grouping: Array[Int])
+ extends SingleRel(cluster, traitSet, inputNode)
+ with FlinkAggregate
+ with DataStreamRel {
+
+ override def deriveRowType() = rowRelDataType
+
+ override def copy(traitSet: RelTraitSet, inputs:
java.util.List[RelNode]): RelNode = {
+ new DataStreamAggregate(
+ window,
+ cluster,
+ traitSet,
+ inputs.get(0),
+ namedAggregates,
+ getRowType,
+ inputType,
+ grouping)
+ }
+
+ override def toString: String = {
+ s"Aggregate(${ if (!grouping.isEmpty) {
+ s"groupBy: (${groupingToString(inputType, grouping)}), "
+ } else {
+ ""
+ }}window: ($window), " +
+ s"select: (${aggregationToString(inputType, grouping, getRowType,
namedAggregates)}))"
+ }
+
+ override def explainTerms(pw: RelWriter): RelWriter = {
+ super.explainTerms(pw)
+ .itemIf("groupBy", groupingToString(inputType, grouping),
!grouping.isEmpty)
+ .item("window", window)
+ .item("select", aggregationToString(inputType, grouping, getRowType,
namedAggregates))
+ }
+
+ override def translateToPlan(
+ tableEnv: StreamTableEnvironment,
+ expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+ val config = tableEnv.getConfig
+
+ val groupingKeys = grouping.indices.toArray
+ // add grouping fields, position keys in the input, and input type
+ val aggregateResult =
AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates,
+ inputType, getRowType, grouping, config)
+
+ val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(
+ tableEnv,
+ // tell the input operator that this operator currently only
supports Rows as input
+ Some(TypeConverter.DEFAULT_ROW_TYPE))
+
+ // get the output types
+ val fieldTypes: Array[TypeInformation[_]] =
getRowType.getFieldList.asScala
+ .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
+ .toArray
+
+ val aggString = aggregationToString(inputType, grouping, getRowType,
namedAggregates)
+ val prepareOpName = s"prepare select: ($aggString)"
+ val mappedInput = inputDS
+ .map(aggregateResult._1)
+ .name(prepareOpName)
+
+ val groupReduceFunction = aggregateResult._2
+ val rowTypeInfo = new RowTypeInfo(fieldTypes)
+
+ val result = {
+ // grouped / keyed aggregation
+ if (groupingKeys.length > 0) {
+ val aggOpName = s"groupBy: (${groupingToString(inputType,
grouping)}), " +
+ s"window: ($window), " +
+ s"select: ($aggString)"
+ val aggregateFunction = new
AggregateWindowFunction(groupReduceFunction)
+
+ val keyedStream = mappedInput.keyBy(groupingKeys: _*)
+
+ val windowedStream = createKeyedWindowedStream(window, keyedStream)
+ .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
+
+ windowedStream
+ .apply(aggregateFunction)
+ .returns(rowTypeInfo)
+ .name(aggOpName)
+ .asInstanceOf[DataStream[Any]]
+ }
+ // global / non-keyed aggregation
+ else {
+ val aggOpName = s"window: ($window), select: ($aggString)"
+ val aggregateFunction = new
AggregateAllWindowFunction(groupReduceFunction)
+
+ val windowedStream = createNonKeyedWindowedStream(window,
mappedInput)
+ .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
+
+ windowedStream
+ .apply(aggregateFunction)
+ .returns(rowTypeInfo)
+ .name(aggOpName)
+ .asInstanceOf[DataStream[Any]]
+ }
+ }
+
+ // if the expected type is not a Row, inject a mapper to convert to
the expected type
+ expectedType match {
+ case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
+ val mapName = s"convert:
(${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
+ result.map(getConversionMapper(
+ config = config,
+ nullableInput = false,
+ inputType = rowTypeInfo.asInstanceOf[TypeInformation[Any]],
+ expectedType = expectedType.get,
+ conversionOperatorName = "DataStreamAggregateConversion",
+ fieldNames = getRowType.getFieldNames.asScala
+ ))
+ .name(mapName)
+ case _ => result
+ }
+ }
+
+}
+
+object DataStreamAggregate {
+
+ private def createKeyedWindowedStream(groupWindow: LogicalWindow,
stream: KeyedStream[Row, Tuple])
+ : WindowedStream[Row, Tuple, _ <: DataStreamWindow] = groupWindow
match {
+
+ case ProcessingTimeTumblingGroupWindow(_, size) if
isTimeInterval(size.resultType) =>
+ stream.window(TumblingProcessingTimeWindows.of(asTime(size)))
+
+ case ProcessingTimeTumblingGroupWindow(_, size) =>
+ stream.countWindow(asCount(size))
+
+ case EventTimeTumblingGroupWindow(_, _, size) if
isTimeInterval(size.resultType) =>
+ stream
+ .window(TumblingEventTimeWindows.of(asTime(size)))
+
+ case EventTimeTumblingGroupWindow(_, _, size) =>
+ stream.countWindow(asCount(size))
--- End diff --
Since this would be a major change / addition, I would propose to add this
feature in a separate issue and exclude it from this one.
> Add group-windows for streaming tables
> ---------------------------------------
>
> Key: FLINK-4691
> URL: https://issues.apache.org/jira/browse/FLINK-4691
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: Timo Walther
>
> Add Tumble, Slide, Session group-windows for streaming tables as described in
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>
> Implementation of group-windows on streaming tables. This includes
> implementing the API of group-windows, the logical validation for
> group-windows, and the definition of the “rowtime” and “systemtime” keywords.
> Group-windows on batch tables won’t be initially supported and will throw an
> exception.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)