[
https://issues.apache.org/jira/browse/FLINK-4692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15817980#comment-15817980
]
ASF GitHub Bot commented on FLINK-4692:
---------------------------------------
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/2938#discussion_r95549888
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner,
RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable,
RowTypeInfo}
+import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.nodes.FlinkAggregate
+import
org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _}
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.TypeConverter
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+ * Flink RelNode which matches along with a LogicalWindowAggregate.
+ */
+class DataSetWindowAggregate(
+ window: LogicalWindow,
+ namedProperties: Seq[NamedWindowProperty],
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ inputNode: RelNode,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ rowRelDataType: RelDataType,
+ inputType: RelDataType,
+ grouping: Array[Int])
+ extends SingleRel(cluster, traitSet, inputNode)
+ with FlinkAggregate
+ with DataSetRel {
+
+ override def deriveRowType() = rowRelDataType
+
+ override def copy(traitSet: RelTraitSet, inputs:
java.util.List[RelNode]): RelNode = {
+ new DataSetWindowAggregate(
+ window,
+ namedProperties,
+ cluster,
+ traitSet,
+ inputs.get(0),
+ namedAggregates,
+ getRowType,
+ inputType,
+ grouping)
+ }
+
+ override def toString: String = {
+ s"Aggregate(${
+ if (!grouping.isEmpty) {
+ s"groupBy: (${groupingToString(inputType, grouping)}), "
+ } else {
+ ""
+ }
+ }window: ($window), " +
+ s"select: (${
+ aggregationToString(
+ inputType,
+ grouping,
+ getRowType,
+ namedAggregates,
+ namedProperties)
+ }))"
+ }
+
+ override def explainTerms(pw: RelWriter): RelWriter = {
+ super.explainTerms(pw)
+ .itemIf("groupBy", groupingToString(inputType, grouping),
!grouping.isEmpty)
+ .item("window", window)
+ .item(
+ "select", aggregationToString(
+ inputType,
+ grouping,
+ getRowType,
+ namedAggregates,
+ namedProperties))
+ }
+
+ override def computeSelfCost (planner: RelOptPlanner, metadata:
RelMetadataQuery): RelOptCost = {
+ val child = this.getInput
+ val rowCnt = metadata.getRowCount(child)
+ val rowSize = this.estimateRowSize(child.getRowType)
+ val aggCnt = this.namedAggregates.size
+ planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt *
rowSize)
+ }
+
+ override def translateToPlan(
+ tableEnv: BatchTableEnvironment,
+ expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+ val config = tableEnv.getConfig
+
+ val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(
+ tableEnv,
+ // tell the input operator that this operator currently only
supports Rows as input
+ Some(TypeConverter.DEFAULT_ROW_TYPE))
+
+ val result = window match {
+ case EventTimeTumblingGroupWindow(_, _, size) =>
+ createEventTimeTumblingWindowDataSet(inputDS,
isTimeInterval(size.resultType))
+ case EventTimeSessionGroupWindow(_, _, _) =>
+ throw new UnsupportedOperationException(
+ "Event-time session windows on batch are currently not
supported")
+ case EventTimeSlidingGroupWindow(_, _, _, _) =>
+ throw new UnsupportedOperationException(
+ "Event-time sliding windows on batch are currently not
supported")
+ case _: ProcessingTimeGroupWindow =>
+ throw new UnsupportedOperationException(
+ "Processing-time tumbling windows are not supported on batch
tables, " +
+ "window on batch must declare a time attribute over which the
query is evaluated.")
+ }
+
+ // if the expected type is not a Row, inject a mapper to convert to
the expected type
+ expectedType match {
+ case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
+ val mapName = s"convert:
(${getRowType.getFieldNames.toList.mkString(", ")})"
+ result.map(
+ getConversionMapper(
+ config = config,
+ nullableInput = false,
+ inputType =
resultRowTypeInfo.asInstanceOf[TypeInformation[Any]],
+ expectedType = expectedType.get,
+ conversionOperatorName = "DataSetWindowAggregateConversion",
+ fieldNames = getRowType.getFieldNames
+ ))
+ .name(mapName)
+ case _ => result
+ }
+ }
+
+
+ private def createEventTimeTumblingWindowDataSet(
+ inputDS: DataSet[Any],
+ isTimeWindow: Boolean)
+ : DataSet[Any] = {
+ val mapFunction = createDataSetWindowPrepareMapFunction(
+ window,
+ namedAggregates,
+ grouping,
+ inputType)
+ val groupReduceFunction = createDataSetWindowAggGroupReduceFunction(
+ window,
+ namedAggregates,
+ inputType,
+ getRowType,
+ grouping,
+ namedProperties)
+
+ val mappedInput = inputDS
+ .map(mapFunction)
+ .name(prepareOperatorName)
+
+ if (isTimeWindow) {
+ // grouped time window aggregation
+ val mapReturnType =
mapFunction.asInstanceOf[ResultTypeQueryable[Row]].getProducedType
+ // group by grouping keys and rowtime field (the last field in the
row)
+ val groupingKeys = grouping.indices ++ Seq(mapReturnType.getArity -
1)
+ mappedInput.asInstanceOf[DataSet[Row]]
+ .groupBy(groupingKeys: _*)
+ .reduceGroup(groupReduceFunction)
+ .returns(resultRowTypeInfo)
+ .name(aggregateOperatorName)
+ .asInstanceOf[DataSet[Any]]
+ } else {
+ // count window
+ val groupingKeys = grouping.indices.toArray
+ if (groupingKeys.length > 0) {
+ // grouped aggregation
+ mappedInput.asInstanceOf[DataSet[Row]]
+ .groupBy(groupingKeys: _*)
+ // sort on time field, it's the one after grouping keys
+ .sortGroup(groupingKeys.length, Order.ASCENDING)
--- End diff --
Shouldn't this be `mapReturnType.getArity - 1`? According to the docs of
`AggregateUtil#createDataSetWindowPrepareMapFunction` the time field should be
at the end?
> Add tumbling group-windows for batch tables
> -------------------------------------------
>
> Key: FLINK-4692
> URL: https://issues.apache.org/jira/browse/FLINK-4692
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: Jark Wu
>
> Add Tumble group-windows for batch tables as described in
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)