[FLINK-4691] [table] Rework window property extraction. - Deduplicate aggregations and window properties in Table API
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/de03e0ce Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/de03e0ce Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/de03e0ce Branch: refs/heads/master Commit: de03e0cea16006f04b5f62d3ff70f583f5db9e4f Parents: 44f3977 Author: Fabian Hueske <[email protected]> Authored: Wed Oct 26 01:08:43 2016 +0200 Committer: Fabian Hueske <[email protected]> Committed: Wed Oct 26 23:03:53 2016 +0200 ---------------------------------------------------------------------- .../api/table/plan/ProjectionTranslator.scala | 152 +++++++++++++------ .../nodes/datastream/DataStreamAggregate.scala | 82 ++++++++-- .../AggregateAllTimeWindowFunction.scala | 53 +++++++ .../aggregate/AggregateAllWindowFunction.scala | 19 +-- .../aggregate/AggregateTimeWindowFunction.scala | 57 +++++++ .../aggregate/AggregateWindowFunction.scala | 19 +-- .../runtime/aggregate/PropertyCollector.scala | 42 ----- .../aggregate/TimeWindowPropertyCollector.scala | 54 +++++++ .../table/runtime/aggregate/WindowEndRead.scala | 38 ----- .../runtime/aggregate/WindowPropertyRead.scala | 33 ---- .../runtime/aggregate/WindowStartRead.scala | 38 ----- .../org/apache/flink/api/table/table.scala | 40 ++--- .../scala/stream/table/AggregationsITCase.scala | 18 +-- .../scala/stream/table/GroupWindowTest.scala | 48 +++++- 14 files changed, 424 insertions(+), 269 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/de03e0ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala index 2299bd1..d09b03e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala @@ -27,78 +27,144 @@ import scala.collection.mutable.ListBuffer object ProjectionTranslator { /** - * Extracts all aggregation and property expressions (zero, one, or more) from an expression, - * and replaces the original expressions by field accesses expressions. + * Extracts and deduplicates all aggregation and window property expressions (zero, one, or more) + * from all expressions and replaces the original expressions by field accesses expressions. + * + * @param exprs a list of expressions to convert + * @param tableEnv the TableEnvironment + * @return a Tuple3, the first field contains the converted expressions, the second field the + * extracted and deduplicated aggregations, and the third field the extracted and + * deduplicated window properties. */ def extractAggregationsAndProperties( - exp: Expression, + exprs: Seq[Expression], tableEnv: TableEnvironment) - : (Expression, List[NamedExpression], List[NamedExpression]) = { + : (Seq[NamedExpression], Seq[NamedExpression], Seq[NamedExpression]) = { + + val (aggNames, propNames) = + exprs.foldLeft( (Map[Expression, String](), Map[Expression, String]()) ) { + (x, y) => identifyAggregationsAndProperties(y, tableEnv, x._1, x._2) + } + + val replaced = exprs + .map(replaceAggregationsAndProperties(_, tableEnv, aggNames, propNames)) + .map { + case e: Expression => UnresolvedAlias(e) + } + val aggs = aggNames.map( a => Alias(a._1, a._2)).toSeq + val props = propNames.map( p => Alias(p._1, p._2)).toSeq + + (replaced, aggs, props) + } + + /** Identifies and deduplicates aggregation functions and window properties. */ + private def identifyAggregationsAndProperties( + exp: Expression, + tableEnv: TableEnvironment, + aggNames: Map[Expression, String], + propNames: Map[Expression, String]) : (Map[Expression, String], Map[Expression, String]) = { + + exp match { + case agg: Aggregation => + if (aggNames contains agg) { + (aggNames, propNames) + } else { + (aggNames + (agg -> tableEnv.createUniqueAttributeName()), propNames) + } + case prop: WindowProperty => + if (propNames contains prop) { + (aggNames, propNames) + } else { + (aggNames, propNames + (prop -> tableEnv.createUniqueAttributeName())) + } + case l: LeafExpression => + (aggNames, propNames) + case u: UnaryExpression => + identifyAggregationsAndProperties(u.child, tableEnv, aggNames, propNames) + case b: BinaryExpression => + val l = identifyAggregationsAndProperties(b.left, tableEnv, aggNames, propNames) + identifyAggregationsAndProperties(b.right, tableEnv, l._1, l._2) + + // Functions calls + case c @ Call(name, args) => + args.foldLeft((aggNames, propNames)){ + (x, y) => identifyAggregationsAndProperties(y, tableEnv, x._1, x._2) + } + + case sfc @ ScalarFunctionCall(clazz, args) => + args.foldLeft((aggNames, propNames)){ + (x, y) => identifyAggregationsAndProperties(y, tableEnv, x._1, x._2) + } + + // General expression + case e: Expression => + e.productIterator.foldLeft((aggNames, propNames)){ + (x, y) => y match { + case e: Expression => identifyAggregationsAndProperties(e, tableEnv, x._1, x._2) + case _ => (x._1, x._2) + } + } + } + } + + /** Replaces aggregations and projections by named field references. */ + private def replaceAggregationsAndProperties( + exp: Expression, + tableEnv: TableEnvironment, + aggNames: Map[Expression, String], + propNames: Map[Expression, String]) : Expression = { exp match { case agg: Aggregation => - val name = tableEnv.createUniqueAttributeName() - val aggCall = Alias(agg, name) - val fieldExp = UnresolvedFieldReference(name) - (fieldExp, List(aggCall), Nil) + val name = aggNames(agg) + Alias(UnresolvedFieldReference(name), tableEnv.createUniqueAttributeName()) case prop: WindowProperty => - val name = tableEnv.createUniqueAttributeName() - val propCall = Alias(prop, name) - val fieldExp = UnresolvedFieldReference(name) - (fieldExp, Nil, List(propCall)) + val name = propNames(prop) + Alias(UnresolvedFieldReference(name), tableEnv.createUniqueAttributeName()) case n @ Alias(agg: Aggregation, name) => - val fieldExp = UnresolvedFieldReference(name) - (fieldExp, List(n), Nil) + val aName = aggNames(agg) + Alias(UnresolvedFieldReference(aName), name) case n @ Alias(prop: WindowProperty, name) => - val fieldExp = UnresolvedFieldReference(name) - (fieldExp, Nil, List(n)) - case l: LeafExpression => - (l, Nil, Nil) + val pName = propNames(prop) + Alias(UnresolvedFieldReference(pName), name) + case l: LeafExpression => l case u: UnaryExpression => - val c = extractAggregationsAndProperties(u.child, tableEnv) - (u.makeCopy(Array(c._1)), c._2, c._3) + val c = replaceAggregationsAndProperties(u.child, tableEnv, aggNames, propNames) + u.makeCopy(Array(c)) case b: BinaryExpression => - val l = extractAggregationsAndProperties(b.left, tableEnv) - val r = extractAggregationsAndProperties(b.right, tableEnv) - (b.makeCopy(Array(l._1, r._1)), - l._2 ::: r._2, - l._3 ::: r._3) + val l = replaceAggregationsAndProperties(b.left, tableEnv, aggNames, propNames) + val r = replaceAggregationsAndProperties(b.right, tableEnv, aggNames, propNames) + b.makeCopy(Array(l, r)) // Functions calls case c @ Call(name, args) => - val newArgs = args.map(extractAggregationsAndProperties(_, tableEnv)) - (c.makeCopy((name :: newArgs.map(_._1) :: Nil).toArray), - newArgs.flatMap(_._2).toList, - newArgs.flatMap(_._3).toList) + val newArgs = args.map(replaceAggregationsAndProperties(_, tableEnv, aggNames, propNames)) + c.makeCopy(Array(name, newArgs)) case sfc @ ScalarFunctionCall(clazz, args) => - val newArgs = args.map(extractAggregationsAndProperties(_, tableEnv)) - (sfc.makeCopy((clazz :: newArgs.map(_._1) :: Nil).toArray), - newArgs.flatMap(_._2).toList, - newArgs.flatMap(_._3).toList) + val newArgs: Seq[Expression] = args + .map(replaceAggregationsAndProperties(_, tableEnv, aggNames, propNames)) + sfc.makeCopy(Array(clazz,newArgs)) // General expression case e: Expression => val newArgs = e.productIterator.map { case arg: Expression => - extractAggregationsAndProperties(arg, tableEnv) + replaceAggregationsAndProperties(arg, tableEnv, aggNames, propNames) } - (e.makeCopy(newArgs.map(_._1).toArray), - newArgs.flatMap(_._2).toList, - newArgs.flatMap(_._3).toList) + e.makeCopy(newArgs.toArray) } } /** - * Parses all input expressions to [[UnresolvedAlias]]. - * And expands star to parent's full project list. + * Expands an UnresolvedFieldReference("*") to parent's full project list. */ - def expandProjectList(exprs: Seq[Expression], parent: LogicalNode): Seq[NamedExpression] = { - val projectList = new ListBuffer[NamedExpression] + def expandProjectList(exprs: Seq[Expression], parent: LogicalNode): Seq[Expression] = { + val projectList = new ListBuffer[Expression] exprs.foreach { case n: UnresolvedFieldReference if n.name == "*" => - projectList ++= parent.output.map(UnresolvedAlias(_)) - case e: Expression => projectList += UnresolvedAlias(e) + projectList ++= parent.output.map(a => UnresolvedFieldReference(a.name)) + case e: Expression => projectList += e } projectList } http://git-wip-us.apache.org/repos/asf/flink/blob/de03e0ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala index b9b4561..b4ae3ab 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala @@ -22,19 +22,21 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.AggregateCall import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.api.common.functions.RichGroupReduceFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.api.table.FlinkRelBuilder.NamedWindowProperty import org.apache.flink.api.table.expressions._ import org.apache.flink.api.table.plan.logical._ import org.apache.flink.api.table.plan.nodes.FlinkAggregate -import org.apache.flink.api.table.plan.nodes.datastream.DataStreamAggregate.{createKeyedWindowedStream, createNonKeyedWindowedStream, transformToPropertyReads} +import org.apache.flink.api.table.plan.nodes.datastream.DataStreamAggregate._ import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._ import org.apache.flink.api.table.runtime.aggregate._ import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval import org.apache.flink.api.table.typeutils.{RowIntervalTypeInfo, RowTypeInfo, TimeIntervalTypeInfo, TypeConverter} -import org.apache.flink.api.table.{FlinkTypeFactory, Row, StreamTableEnvironment} +import org.apache.flink.api.table.{TableException, FlinkTypeFactory, Row, StreamTableEnvironment} import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream} +import org.apache.flink.streaming.api.functions.windowing.{WindowFunction, AllWindowFunction} import org.apache.flink.streaming.api.windowing.assigners._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow} @@ -115,8 +117,6 @@ class DataStreamAggregate( getRowType, grouping) - val propertyReads = transformToPropertyReads(namedProperties.map(_.property)) - val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan( tableEnv, // tell the input operator that this operator currently only supports Rows as input @@ -148,7 +148,8 @@ class DataStreamAggregate( val aggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " + s"window: ($window), " + s"select: ($aggString)" - val aggregateFunction = new AggregateWindowFunction(propertyReads, groupReduceFunction) + val aggregateFunction = + createWindowAggregationFunction(window, namedProperties, groupReduceFunction) val keyedStream = mappedInput.keyBy(groupingKeys: _*) @@ -164,7 +165,8 @@ class DataStreamAggregate( // global / non-keyed aggregation else { val aggOpName = s"window: ($window), select: ($aggString)" - val aggregateFunction = new AggregateAllWindowFunction(propertyReads, groupReduceFunction) + val aggregateFunction = + createAllWindowAggregationFunction(window, namedProperties, groupReduceFunction) val windowedStream = createNonKeyedWindowedStream(window, mappedInput) .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]] @@ -197,11 +199,69 @@ class DataStreamAggregate( object DataStreamAggregate { - private def transformToPropertyReads(namedProperties: Seq[WindowProperty]) - : Array[WindowPropertyRead[_ <: Any]] = namedProperties.map { - case WindowStart(_) => new WindowStartRead() - case WindowEnd(_) => new WindowEndRead() - }.toArray + private def createAllWindowAggregationFunction( + window: LogicalWindow, + properties: Seq[NamedWindowProperty], + aggFunction: RichGroupReduceFunction[Row, Row]) + : AllWindowFunction[Row, Row, DataStreamWindow] = { + + if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos) + .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] + } else { + new AggregateAllWindowFunction(aggFunction) + } + + } + + private def createWindowAggregationFunction( + window: LogicalWindow, + properties: Seq[NamedWindowProperty], + aggFunction: RichGroupReduceFunction[Row, Row]) + : WindowFunction[Row, Row, Tuple, DataStreamWindow] = { + + if (isTimeWindow(window)) { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + new AggregateTimeWindowFunction(aggFunction, startPos, endPos) + .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] + } else { + new AggregateWindowFunction(aggFunction) + } + + } + + private def isTimeWindow(window: LogicalWindow) = { + window match { + case ProcessingTimeTumblingGroupWindow(_, size) => isTimeInterval(size.resultType) + case ProcessingTimeSlidingGroupWindow(_, size, _) => isTimeInterval(size.resultType) + case ProcessingTimeSessionGroupWindow(_, _) => true + case EventTimeTumblingGroupWindow(_, _, size) => isTimeInterval(size.resultType) + case EventTimeSlidingGroupWindow(_, _, size, _) => isTimeInterval(size.resultType) + case EventTimeSessionGroupWindow(_, _, _) => true + } + } + + def computeWindowStartEndPropertyPos(properties: Seq[NamedWindowProperty]) + : (Option[Int], Option[Int]) = { + + val propPos = properties.foldRight((None: Option[Int], None: Option[Int], 0)) { + (p, x) => p match { + case NamedWindowProperty(name, prop) => + prop match { + case WindowStart(_) if x._1.isDefined => + throw new TableException("Duplicate WindowStart property encountered. This is a bug.") + case WindowStart(_) => + (Some(x._3), x._2, x._3 - 1) + case WindowEnd(_) if x._2.isDefined => + throw new TableException("Duplicate WindowEnd property encountered. This is a bug.") + case WindowEnd(_) => + (x._1, Some(x._3), x._3 - 1) + } + } + } + (propPos._1, propPos._2) + } private def createKeyedWindowedStream(groupWindow: LogicalWindow, stream: KeyedStream[Row, Tuple]) : WindowedStream[Row, Tuple, _ <: DataStreamWindow] = groupWindow match { http://git-wip-us.apache.org/repos/asf/flink/blob/de03e0ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala new file mode 100644 index 0000000..ceadfe7 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.api.table.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.util.Collector + +class AggregateAllTimeWindowFunction( + groupReduceFunction: RichGroupReduceFunction[Row, Row], + windowStartPos: Option[Int], + windowEndPos: Option[Int]) + + extends RichAllWindowFunction[Row, Row, TimeWindow] { + + private var collector: TimeWindowPropertyCollector = _ + + override def open(parameters: Configuration): Unit = { + groupReduceFunction.open(parameters) + collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos) + } + + override def apply(window: TimeWindow, input: Iterable[Row], out: Collector[Row]): Unit = { + + // set collector and window + collector.wrappedCollector = out + collector.timeWindow = window + + // call wrapped reduce function with property collector + groupReduceFunction.reduce(input, collector) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de03e0ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala index 86f8a20..53ab948 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala @@ -27,27 +27,14 @@ import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction import org.apache.flink.streaming.api.windowing.windows.Window import org.apache.flink.util.Collector -class AggregateAllWindowFunction( - propertyReads: Array[WindowPropertyRead[_ <: Any]], - groupReduceFunction: RichGroupReduceFunction[Row, Row]) - extends RichAllWindowFunction[Row, Row, Window] { - - private var propertyCollector: PropertyCollector = _ +class AggregateAllWindowFunction(groupReduceFunction: RichGroupReduceFunction[Row, Row]) + extends RichAllWindowFunction[Row, Row, Window] { override def open(parameters: Configuration): Unit = { groupReduceFunction.open(parameters) - propertyCollector = new PropertyCollector(propertyReads) } override def apply(window: Window, input: Iterable[Row], out: Collector[Row]): Unit = { - - // extract the properties from window - propertyReads.foreach(_.extract(window)) - - // set final collector - propertyCollector.finalCollector = out - - // call wrapped reduce function with property collector - groupReduceFunction.reduce(input, propertyCollector) + groupReduceFunction.reduce(input, out) } } http://git-wip-us.apache.org/repos/asf/flink/blob/de03e0ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala new file mode 100644 index 0000000..80f52ca --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.api.table.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.util.Collector + +class AggregateTimeWindowFunction( + groupReduceFunction: RichGroupReduceFunction[Row, Row], + windowStartPos: Option[Int], + windowEndPos: Option[Int]) + extends RichWindowFunction[Row, Row, Tuple, TimeWindow] { + + private var collector: TimeWindowPropertyCollector = _ + + override def open(parameters: Configuration): Unit = { + groupReduceFunction.open(parameters) + collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos) + } + + override def apply( + key: Tuple, + window: TimeWindow, + input: Iterable[Row], + out: Collector[Row]) : Unit = { + + // set collector and window + collector.wrappedCollector = out + collector.timeWindow = window + + // call wrapped reduce function with property collector + groupReduceFunction.reduce(input, collector) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de03e0ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala index c65ac35..180248f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala @@ -28,32 +28,19 @@ import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction import org.apache.flink.streaming.api.windowing.windows.Window import org.apache.flink.util.Collector -class AggregateWindowFunction( - propertyReads: Array[WindowPropertyRead[_ <: Any]], - groupReduceFunction: RichGroupReduceFunction[Row, Row]) +class AggregateWindowFunction(groupReduceFunction: RichGroupReduceFunction[Row, Row]) extends RichWindowFunction[Row, Row, Tuple, Window] { - private var propertyCollector: PropertyCollector = _ - override def open(parameters: Configuration): Unit = { groupReduceFunction.open(parameters) - propertyCollector = new PropertyCollector(propertyReads) } override def apply( key: Tuple, window: Window, input: Iterable[Row], - out: Collector[Row]) - : Unit = { - - // extract the properties from window - propertyReads.foreach(_.extract(window)) - - // set final collector - propertyCollector.finalCollector = out + out: Collector[Row]) : Unit = { - // call wrapped reduce function with property collector - groupReduceFunction.reduce(input, propertyCollector) + groupReduceFunction.reduce(input, out) } } http://git-wip-us.apache.org/repos/asf/flink/blob/de03e0ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/PropertyCollector.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/PropertyCollector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/PropertyCollector.scala deleted file mode 100644 index 763ed0b..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/PropertyCollector.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.runtime.aggregate - -import org.apache.flink.api.table.Row -import org.apache.flink.util.Collector - -/** - * Adds properties to the end of a row before it emits it to the final collector. - * The collector assumes that the row has placeholders at the end that can be filled. - */ -class PropertyCollector(properties: Array[WindowPropertyRead[_ <: Any]]) extends Collector[Row] { - var finalCollector: Collector[Row] = _ - - override def collect(record: Row): Unit = { - var i: Int = 0 - while (i < properties.length) { - val idx = record.productArity - properties.length + i - record.setField(idx, properties(i).get()) - i = i + 1 - } - finalCollector.collect(record) - } - - override def close(): Unit = finalCollector.close() -} http://git-wip-us.apache.org/repos/asf/flink/blob/de03e0ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/TimeWindowPropertyCollector.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/TimeWindowPropertyCollector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/TimeWindowPropertyCollector.scala new file mode 100644 index 0000000..9f1c23b --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/TimeWindowPropertyCollector.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime.aggregate + +import org.apache.calcite.runtime.SqlFunctions +import org.apache.flink.api.table.Row +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.util.Collector + +/** + * Adds TimeWindow properties to specified fields of a row before it emits the row to a wrapped + * collector. + */ +class TimeWindowPropertyCollector(windowStartOffset: Option[Int], windowEndOffset: Option[Int]) + extends Collector[Row] { + + var wrappedCollector: Collector[Row] = _ + var timeWindow: TimeWindow = _ + + override def collect(record: Row): Unit = { + + val lastFieldPos = record.productArity - 1 + + if (windowStartOffset.isDefined) { + record.setField( + lastFieldPos + windowStartOffset.get, + SqlFunctions.internalToTimestamp(timeWindow.getStart)) + } + if (windowEndOffset.isDefined) { + record.setField( + lastFieldPos + windowEndOffset.get, + SqlFunctions.internalToTimestamp(timeWindow.getEnd)) + } + wrappedCollector.collect(record) + } + + override def close(): Unit = wrappedCollector.close() +} http://git-wip-us.apache.org/repos/asf/flink/blob/de03e0ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowEndRead.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowEndRead.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowEndRead.scala deleted file mode 100644 index dd12238..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowEndRead.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.runtime.aggregate - -import java.sql.Timestamp - -import org.apache.calcite.runtime.SqlFunctions -import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window} - -class WindowEndRead extends WindowPropertyRead[Timestamp] { - - private var ts: Timestamp = _ - - override def extract(window: Window): Unit = window match { - case timeWindow: TimeWindow => - ts = SqlFunctions.internalToTimestamp(timeWindow.getEnd) - case _ => - ts = null - } - - override def get(): Timestamp = ts -} http://git-wip-us.apache.org/repos/asf/flink/blob/de03e0ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowPropertyRead.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowPropertyRead.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowPropertyRead.scala deleted file mode 100644 index 88ea30d..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowPropertyRead.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.runtime.aggregate - -import org.apache.flink.streaming.api.windowing.windows.Window - -/** - * Base class for reading a window property. The property will be extracted once and - * can be read multiple times. - */ -trait WindowPropertyRead[T] extends Serializable { - - def extract(window: Window): Unit - - def get(): T - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de03e0ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowStartRead.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowStartRead.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowStartRead.scala deleted file mode 100644 index d92cb18..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/WindowStartRead.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.runtime.aggregate - -import java.sql.Timestamp - -import org.apache.calcite.runtime.SqlFunctions -import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window} - -class WindowStartRead extends WindowPropertyRead[Timestamp] { - - private var ts: Timestamp = _ - - override def extract(window: Window): Unit = window match { - case timeWindow: TimeWindow => - ts = SqlFunctions.internalToTimestamp(timeWindow.getStart) - case _ => - ts = null - } - - override def get(): Timestamp = ts -} http://git-wip-us.apache.org/repos/asf/flink/blob/de03e0ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala index ebb4dcb..8528c8a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala @@ -76,16 +76,21 @@ class Table( * }}} */ def select(fields: Expression*): Table = { - val projectionOnAggregates = fields.map(extractAggregationsAndProperties(_, tableEnv)) - val aggregations = projectionOnAggregates.flatMap(_._2) - val projectList = expandProjectList(projectionOnAggregates.map(_._1), logicalPlan) - if (aggregations.nonEmpty) { + + val expandedFields = expandProjectList(fields, logicalPlan) + val (projection, aggs, props) = extractAggregationsAndProperties(expandedFields, tableEnv) + + if (props.nonEmpty) { + throw ValidationException("Window properties can only be used on windowed tables.") + } + + if (aggs.nonEmpty) { new Table(tableEnv, - Project(projectList, - Aggregate(Nil, aggregations, logicalPlan).validate(tableEnv)).validate(tableEnv)) + Project(projection, + Aggregate(Nil, aggs, logicalPlan).validate(tableEnv)).validate(tableEnv)) } else { new Table(tableEnv, - Project(projectList, logicalPlan).validate(tableEnv)) + Project(projection, logicalPlan).validate(tableEnv)) } } @@ -672,20 +677,18 @@ class GroupedTable( */ def select(fields: Expression*): Table = { - val projectionOnAggsAndProps = fields.map(extractAggregationsAndProperties(_, table.tableEnv)) - val aggregations = projectionOnAggsAndProps.flatMap(_._2) - val properties = projectionOnAggsAndProps.flatMap(_._3) + val (projection, aggs, props) = extractAggregationsAndProperties(fields, table.tableEnv) - if (properties.nonEmpty) { + if (props.nonEmpty) { throw ValidationException("Window properties can only be used on windowed tables.") } val logical = Project( - projectionOnAggsAndProps.map(e => UnresolvedAlias(e._1)), + projection, Aggregate( groupKey, - aggregations, + aggs, table.logicalPlan ).validate(table.tableEnv) ).validate(table.tableEnv) @@ -744,20 +747,19 @@ class GroupWindowedTable( * }}} */ def select(fields: Expression*): Table = { - val projectionOnAggsAndProps = fields.map(extractAggregationsAndProperties(_, table.tableEnv)) - val aggregations = projectionOnAggsAndProps.flatMap(_._2) - val properties = projectionOnAggsAndProps.flatMap(_._3) + + val (projection, aggs, props) = extractAggregationsAndProperties(fields, table.tableEnv) val groupWindow = window.toLogicalWindow val logical = Project( - projectionOnAggsAndProps.map(e => UnresolvedAlias(e._1)), + projection, WindowAggregate( groupKey, groupWindow, - properties, - aggregations, + props, + aggs, table.logicalPlan ).validate(table.tableEnv) ).validate(table.tableEnv) http://git-wip-us.apache.org/repos/asf/flink/blob/de03e0ce/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala index 6bb513e..2ccbb38 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala @@ -160,21 +160,21 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase { val windowedTable = table .groupBy('string) .window(Slide over 10.milli every 5.milli on 'rowtime as 'w) - .select('string, 'int.count, 'w.start, 'w.end) + .select('string, 'int.count, 'w.start, 'w.end, 'w.start) val results = windowedTable.toDataStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() val expected = Seq( - "Hello world,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01", - "Hello world,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015", - "Hello world,1,1970-01-01 00:00:00.01,1970-01-01 00:00:00.02", - "Hello world,1,1970-01-01 00:00:00.015,1970-01-01 00:00:00.025", - "Hello,2,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005", - "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01", - "Hi,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005", - "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01") + "Hello world,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 00:00:00.0", + "Hello world,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015,1970-01-01 00:00:00.005", + "Hello world,1,1970-01-01 00:00:00.01,1970-01-01 00:00:00.02,1970-01-01 00:00:00.01", + "Hello world,1,1970-01-01 00:00:00.015,1970-01-01 00:00:00.025,1970-01-01 00:00:00.015", + "Hello,2,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005,1969-12-31 23:59:59.995", + "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 00:00:00.0", + "Hi,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005,1969-12-31 23:59:59.995", + "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 00:00:00.0") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } } http://git-wip-us.apache.org/repos/asf/flink/blob/de03e0ce/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowTest.scala index 96fd787..b59b151 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowTest.scala @@ -586,7 +586,7 @@ class GroupWindowTest extends TableTestBase { val windowedTable = table .groupBy('string) .window(Session withGap 3.milli on 'rowtime as 'w) - .select('w.end, 'string, 'int.count, 'w.start, 'w.end) + .select('w.end as 'we1, 'string, 'int.count as 'cnt, 'w.start as 'ws, 'w.end as 'we2) val expected = unaryNode( "DataStreamCalc", @@ -603,10 +603,50 @@ class GroupWindowTest extends TableTestBase { "string", "COUNT(int) AS TMP_1", "end(WindowReference(w)) AS TMP_0", - "start(WindowReference(w)) AS TMP_2", - "end(WindowReference(w)) AS TMP_3") + "start(WindowReference(w)) AS TMP_2") ), - term("select", "TMP_0", "string", "TMP_1", "TMP_2", "TMP_3") + term("select", "TMP_0 AS we1", "string", "TMP_1 AS cnt", "TMP_2 AS ws", "TMP_0 AS we2") + ) + + util.verifyTable(windowedTable, expected) + } + + @Test + def testTumbleWindowWithDuplicateAggsAndProps(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + val windowedTable = table + .groupBy('string) + .window(Tumble over 5.millis on 'rowtime as 'w) + .select('string, 'int.sum + 1 as 's1, 'int.sum + 3 as 's2, 'w.start as 'x, 'w.start as 'x2, + 'w.end as 'x3, 'w.end) + + val expected = unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamAggregate", + streamTableNode(0), + term("groupBy", "string"), + term("window", + EventTimeTumblingGroupWindow( + Some(WindowReference("w")), + RowtimeAttribute(), + 5.millis)), + term("select", + "string", + "SUM(int) AS TMP_0", + "start(WindowReference(w)) AS TMP_1", + "end(WindowReference(w)) AS TMP_2") + ), + term("select", + "string", + "+(CAST(AS(TMP_0, 'TMP_3')), CAST(1)) AS s1", + "+(CAST(AS(TMP_0, 'TMP_4')), CAST(3)) AS s2", + "TMP_1 AS x", + "TMP_1 AS x2", + "TMP_2 AS x3", + "TMP_2 AS TMP_5") ) util.verifyTable(windowedTable, expected)
