Repository: flink Updated Branches: refs/heads/master c31f95cab -> 728c936dd
[FLINK-5219] [table] Add non-grouped session windows for batch tables. This closes #3266. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/728c936d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/728c936d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/728c936d Branch: refs/heads/master Commit: 728c936dd2ac18701e1d8696da251aec351b2ae6 Parents: c31f95c Author: é竹 <[email protected]> Authored: Fri Mar 3 10:39:37 2017 +0800 Committer: Fabian Hueske <[email protected]> Committed: Fri Mar 3 16:52:13 2017 +0100 ---------------------------------------------------------------------- .../nodes/dataset/DataSetWindowAggregate.scala | 96 ++++++--- .../table/runtime/aggregate/AggregateUtil.scala | 57 +++++- ...ionWindowAggregateCombineGroupFunction.scala | 168 ---------------- ...aSetSessionWindowAggregatePreProcessor.scala | 197 +++++++++++++++++++ .../dataset/DataSetWindowAggregateITCase.scala | 21 +- 5 files changed, 331 insertions(+), 208 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/728c936d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala index 597be8c..fb5ff3b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala @@ -194,33 +194,32 @@ class DataSetWindowAggregate( val groupingKeys = grouping.indices.toArray val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType) - // grouping window - if (groupingKeys.length > 0) { - // create mapFunction for initializing the aggregations - val mapFunction = createDataSetWindowPrepareMapFunction( - window, - namedAggregates, - grouping, - inputType, - isParserCaseSensitive) - - val mappedInput = inputDS.map(mapFunction).name(prepareOperatorName) + // create mapFunction for initializing the aggregations + val mapFunction = createDataSetWindowPrepareMapFunction( + window, + namedAggregates, + grouping, + inputType, + isParserCaseSensitive) - val mapReturnType = mapFunction.asInstanceOf[ResultTypeQueryable[Row]].getProducedType + val mappedInput = inputDS.map(mapFunction).name(prepareOperatorName) - // the position of the rowtime field in the intermediate result for map output - val rowTimeFieldPos = mapReturnType.getArity - 1 + val mapReturnType = mapFunction.asInstanceOf[ResultTypeQueryable[Row]].getProducedType - // do incremental aggregation - if (doAllSupportPartialMerge( - namedAggregates.map(_.getKey), - inputType, - grouping.length)) { + // the position of the rowtime field in the intermediate result for map output + val rowTimeFieldPos = mapReturnType.getArity - 1 - // gets the window-start and window-end position in the intermediate result. - val windowStartPos = rowTimeFieldPos - val windowEndPos = windowStartPos + 1 + // do incremental aggregation + if (doAllSupportPartialMerge( + namedAggregates.map(_.getKey), + inputType, + grouping.length)) { + // gets the window-start and window-end position in the intermediate result. + val windowStartPos = rowTimeFieldPos + val windowEndPos = windowStartPos + 1 + // grouping window + if (groupingKeys.length > 0) { // create groupCombineFunction for combine the aggregations val combineGroupFunction = createDataSetWindowAggregationCombineFunction( window, @@ -248,9 +247,38 @@ class DataSetWindowAggregate( .reduceGroup(groupReduceFunction) .returns(rowTypeInfo) .name(aggregateOperatorName) + } else { + // non-grouping window + val mapPartitionFunction = createDataSetWindowAggregationMapPartitionFunction( + window, + namedAggregates, + inputType, + grouping) + + // create groupReduceFunction for calculating the aggregations + val groupReduceFunction = createDataSetWindowAggregationGroupReduceFunction( + window, + namedAggregates, + inputType, + rowRelDataType, + grouping, + namedProperties, + isInputCombined = true) + + mappedInput.sortPartition(rowTimeFieldPos, Order.ASCENDING) + .mapPartition(mapPartitionFunction) + .sortPartition(windowStartPos, Order.ASCENDING).setParallelism(1) + .sortPartition(windowEndPos, Order.ASCENDING).setParallelism(1) + .reduceGroup(groupReduceFunction) + .returns(rowTypeInfo) + .name(aggregateOperatorName) + .asInstanceOf[DataSet[Row]] } - // do non-incremental aggregation - else { + // do non-incremental aggregation + } else { + // grouping window + if (groupingKeys.length > 0) { + // create groupReduceFunction for calculating the aggregations val groupReduceFunction = createDataSetWindowAggregationGroupReduceFunction( window, @@ -265,13 +293,23 @@ class DataSetWindowAggregate( .reduceGroup(groupReduceFunction) .returns(rowTypeInfo) .name(aggregateOperatorName) + } else { + // non-grouping window + val groupReduceFunction = createDataSetWindowAggregationGroupReduceFunction( + window, + namedAggregates, + inputType, + rowRelDataType, + grouping, + namedProperties) + + mappedInput.sortPartition(rowTimeFieldPos, Order.ASCENDING).setParallelism(1) + .reduceGroup(groupReduceFunction) + .returns(rowTypeInfo) + .name(aggregateOperatorName) + .asInstanceOf[DataSet[Row]] } } - // non-grouping window - else { - throw new UnsupportedOperationException( - "Session non-grouping windows on event-time are currently not supported.") - } } private def prepareOperatorName: String = { http://git-wip-us.apache.org/repos/asf/flink/blob/728c936d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala index 40468ad..d549c37 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala @@ -25,7 +25,7 @@ import org.apache.calcite.sql.{SqlAggFunction, SqlKind} import org.apache.calcite.sql.`type`.SqlTypeName._ import org.apache.calcite.sql.`type`.SqlTypeName import org.apache.calcite.sql.fun._ -import org.apache.flink.api.common.functions.{InvalidTypesException, MapFunction, RichGroupCombineFunction, RichGroupReduceFunction, AggregateFunction => ApiAggregateFunction} +import org.apache.flink.api.common.functions.{GroupCombineFunction, InvalidTypesException, MapFunction, MapPartitionFunction, RichGroupReduceFunction, AggregateFunction => ApiAggregateFunction} import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeHint, TypeInformation} import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.api.java.typeutils.RowTypeInfo @@ -246,6 +246,57 @@ object AggregateUtil { } /** + * Create a [[org.apache.flink.api.common.functions.MapPartitionFunction]] that aggregation + * for aggregates. + * The function returns aggregate values of all aggregate function which are + * organized by the following format: + * + * {{{ + * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 + * | | windowEnd(max(rowtime) + * | | | + * v v v + * +--------+--------+--------+--------+-----------+---------+ + * | sum1 | count1 | sum2 | count2 |windowStart|windowEnd| + * +--------+--------+--------+--------+-----------+---------+ + * ^ ^ + * | | + * sum(y) aggOffsetInRow = 4 windowStart(min(rowtime)) + * + * }}} + * + */ + def createDataSetWindowAggregationMapPartitionFunction( + window: LogicalWindow, + namedAggregates: Seq[CalcitePair[AggregateCall, String]], + inputType: RelDataType, + groupings: Array[Int]): MapPartitionFunction[Row, Row] = { + + val aggregates = transformToAggregateFunctions( + namedAggregates.map(_.getKey), + inputType, + 0)._2 + + window match { + case EventTimeSessionGroupWindow(_, _, gap) => + val combineReturnType: RowTypeInfo = + createDataSetAggregateBufferDataType( + groupings, + aggregates, + inputType, + Option(Array(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO))) + + new DataSetSessionWindowAggregatePreProcessor( + aggregates, + groupings, + asLong(gap), + combineReturnType) + case _ => + throw new UnsupportedOperationException(s"$window is currently not supported on batch") + } + } + + /** * Create a [[org.apache.flink.api.common.functions.GroupCombineFunction]] that pre-aggregation * for aggregates. * The function returns intermediate aggregate values of all aggregate function which are @@ -268,7 +319,7 @@ object AggregateUtil { namedAggregates: Seq[CalcitePair[AggregateCall, String]], inputType: RelDataType, groupings: Array[Int]) - : RichGroupCombineFunction[Row, Row] = { + : GroupCombineFunction[Row, Row] = { val aggregates = transformToAggregateFunctions( namedAggregates.map(_.getKey), @@ -284,7 +335,7 @@ object AggregateUtil { inputType, Option(Array(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO))) - new DataSetSessionWindowAggregateCombineGroupFunction( + new DataSetSessionWindowAggregatePreProcessor( aggregates, groupings, asLong(gap), http://git-wip-us.apache.org/repos/asf/flink/blob/728c936d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala deleted file mode 100644 index 88cd19f..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.runtime.aggregate - -import java.lang.Iterable -import java.util.{ArrayList => JArrayList} - -import org.apache.flink.api.common.functions.RichGroupCombineFunction -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.ResultTypeQueryable -import org.apache.flink.types.Row -import org.apache.flink.configuration.Configuration -import org.apache.flink.table.functions.{Accumulator, AggregateFunction} -import org.apache.flink.util.{Collector, Preconditions} - -/** - * This wraps the aggregate logic inside of - * [[org.apache.flink.api.java.operators.GroupCombineOperator]]. - * - * @param aggregates The aggregate functions. - * @param groupingKeys The indexes of the grouping fields. - * @param gap Session time window gap. - * @param intermediateRowType Intermediate row data type. - */ -class DataSetSessionWindowAggregateCombineGroupFunction( - aggregates: Array[AggregateFunction[_ <: Any]], - groupingKeys: Array[Int], - gap: Long, - @transient intermediateRowType: TypeInformation[Row]) - extends RichGroupCombineFunction[Row, Row] with ResultTypeQueryable[Row] { - - private var aggregateBuffer: Row = _ - private val accumStartPos: Int = groupingKeys.length - private val rowTimeFieldPos = accumStartPos + aggregates.length - - val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) { - new JArrayList[Accumulator](2) - } - - override def open(config: Configuration) { - Preconditions.checkNotNull(aggregates) - Preconditions.checkNotNull(groupingKeys) - aggregateBuffer = new Row(rowTimeFieldPos + 2) - - // init lists with two empty accumulators - for (i <- aggregates.indices) { - val accumulator = aggregates(i).createAccumulator() - accumulatorList(i).add(accumulator) - accumulatorList(i).add(accumulator) - } - } - - /** - * For sub-grouped intermediate aggregate Rows, divide window based on the rowtime - * (current'rowtime - previousârowtime > gap), and then merge data (within a unified window) - * into an aggregate buffer. - * - * @param records Sub-grouped intermediate aggregate Rows. - * @return Combined intermediate aggregate Row. - * - */ - override def combine(records: Iterable[Row], out: Collector[Row]): Unit = { - - var windowStart: java.lang.Long = null - var windowEnd: java.lang.Long = null - var currentRowTime: java.lang.Long = null - - // reset first accumulator in merge list - for (i <- aggregates.indices) { - val accumulator = aggregates(i).createAccumulator() - accumulatorList(i).set(0, accumulator) - } - - val iterator = records.iterator() - - while (iterator.hasNext) { - val record = iterator.next() - currentRowTime = record.getField(rowTimeFieldPos).asInstanceOf[Long] - // initial traversal or opening a new window - if (windowEnd == null || (windowEnd != null && (currentRowTime > windowEnd))) { - - // calculate the current window and open a new window. - if (windowEnd != null) { - // emit the current window's merged data - doCollect(out, accumulatorList, windowStart, windowEnd) - - // reset first value of accumulator list - for (i <- aggregates.indices) { - val accumulator = aggregates(i).createAccumulator() - accumulatorList(i).set(0, accumulator) - } - } else { - // set group keys to aggregateBuffer. - for (i <- groupingKeys.indices) { - aggregateBuffer.setField(i, record.getField(i)) - } - } - - windowStart = record.getField(rowTimeFieldPos).asInstanceOf[Long] - } - - for (i <- aggregates.indices) { - // insert received accumulator into acc list - val newAcc = record.getField(accumStartPos + i).asInstanceOf[Accumulator] - accumulatorList(i).set(1, newAcc) - // merge acc list - val retAcc = aggregates(i).merge(accumulatorList(i)) - // insert result into acc list - accumulatorList(i).set(0, retAcc) - } - - // the current rowtime is the last rowtime of the next calculation. - windowEnd = currentRowTime + gap - } - // emit the merged data of the current window. - doCollect(out, accumulatorList, windowStart, windowEnd) - } - - /** - * Emit the merged data of the current window. - * - * @param out the collection of the aggregate results - * @param accumulatorList an array (indexed by aggregate index) of the accumulator lists for - * each aggregate - * @param windowStart the window's start attribute value is the min (rowtime) - * of all rows in the window. - * @param windowEnd the window's end property value is max (rowtime) + gap - * for all rows in the window. - */ - def doCollect( - out: Collector[Row], - accumulatorList: Array[JArrayList[Accumulator]], - windowStart: Long, - windowEnd: Long): Unit = { - - // merge the accumulators into one accumulator - for (i <- aggregates.indices) { - aggregateBuffer.setField(accumStartPos + i, accumulatorList(i).get(0)) - } - - // intermediate Row WindowStartPos is rowtime pos. - aggregateBuffer.setField(rowTimeFieldPos, windowStart) - - // intermediate Row WindowEndPos is rowtime pos + 1. - aggregateBuffer.setField(rowTimeFieldPos + 1, windowEnd) - - out.collect(aggregateBuffer) - } - - override def getProducedType: TypeInformation[Row] = { - intermediateRowType - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/728c936d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala new file mode 100644 index 0000000..a299c40 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable +import java.util.{ArrayList => JArrayList} + +import org.apache.flink.api.common.functions.{AbstractRichFunction, GroupCombineFunction, MapPartitionFunction, RichGroupCombineFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.types.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.util.{Collector, Preconditions} + +/** + * This wraps the aggregate logic inside of + * [[org.apache.flink.api.java.operators.GroupCombineOperator]]. + * + * @param aggregates The aggregate functions. + * @param groupingKeys The indexes of the grouping fields. + * @param gap Session time window gap. + * @param intermediateRowType Intermediate row data type. + */ +class DataSetSessionWindowAggregatePreProcessor( + aggregates: Array[AggregateFunction[_ <: Any]], + groupingKeys: Array[Int], + gap: Long, + @transient intermediateRowType: TypeInformation[Row]) + extends AbstractRichFunction + with MapPartitionFunction[Row,Row] + with GroupCombineFunction[Row,Row] + with ResultTypeQueryable[Row] { + + private var aggregateBuffer: Row = _ + private val accumStartPos: Int = groupingKeys.length + private val rowTimeFieldPos = accumStartPos + aggregates.length + + val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) { + new JArrayList[Accumulator](2) + } + + override def open(config: Configuration) { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupingKeys) + aggregateBuffer = new Row(rowTimeFieldPos + 2) + + // init lists with two empty accumulators + for (i <- aggregates.indices) { + val accumulator = aggregates(i).createAccumulator() + accumulatorList(i).add(accumulator) + accumulatorList(i).add(accumulator) + } + } + + /** + * For sub-grouped intermediate aggregate Rows, divide window based on the rowtime + * (current'rowtime - previousârowtime > gap), and then merge data (within a unified window) + * into an aggregate buffer. + * + * @param records Sub-grouped intermediate aggregate Rows. + * @return Combined intermediate aggregate Row. + * + */ + override def combine(records: Iterable[Row], out: Collector[Row]): Unit = { + preProcessing(records, out) + } + + /** + * Divide window based on the rowtime + * (current'rowtime - previousârowtime > gap), and then merge data (within a unified window) + * into an aggregate buffer. + * + * @param records Intermediate aggregate Rows. + * @return Pre partition intermediate aggregate Row. + * + */ + override def mapPartition(records: Iterable[Row], out: Collector[Row]): Unit = { + preProcessing(records, out) + } + + /** + * Intermediate aggregate Rows, divide window based on the rowtime + * (current'rowtime - previousârowtime > gap), and then merge data (within a unified window) + * into an aggregate buffer. + * + * @param records Intermediate aggregate Rows. + * @return PreProcessing intermediate aggregate Row. + * + */ + private def preProcessing(records: Iterable[Row], out: Collector[Row]): Unit = { + + var windowStart: java.lang.Long = null + var windowEnd: java.lang.Long = null + var currentRowTime: java.lang.Long = null + + // reset first accumulator in merge list + for (i <- aggregates.indices) { + val accumulator = aggregates(i).createAccumulator() + accumulatorList(i).set(0, accumulator) + } + + val iterator = records.iterator() + + while (iterator.hasNext) { + val record = iterator.next() + currentRowTime = record.getField(rowTimeFieldPos).asInstanceOf[Long] + // initial traversal or opening a new window + if (windowEnd == null || (windowEnd != null && (currentRowTime > windowEnd))) { + + // calculate the current window and open a new window. + if (windowEnd != null) { + // emit the current window's merged data + doCollect(out, accumulatorList, windowStart, windowEnd) + + // reset first value of accumulator list + for (i <- aggregates.indices) { + val accumulator = aggregates(i).createAccumulator() + accumulatorList(i).set(0, accumulator) + } + } else { + // set group keys to aggregateBuffer. + for (i <- groupingKeys.indices) { + aggregateBuffer.setField(i, record.getField(i)) + } + } + + windowStart = record.getField(rowTimeFieldPos).asInstanceOf[Long] + } + + for (i <- aggregates.indices) { + // insert received accumulator into acc list + val newAcc = record.getField(accumStartPos + i).asInstanceOf[Accumulator] + accumulatorList(i).set(1, newAcc) + // merge acc list + val retAcc = aggregates(i).merge(accumulatorList(i)) + // insert result into acc list + accumulatorList(i).set(0, retAcc) + } + + // the current rowtime is the last rowtime of the next calculation. + windowEnd = currentRowTime + gap + } + // emit the merged data of the current window. + doCollect(out, accumulatorList, windowStart, windowEnd) + } + + /** + * Emit the merged data of the current window. + * + * @param out the collection of the aggregate results + * @param accumulatorList an array (indexed by aggregate index) of the accumulator lists for + * each aggregate + * @param windowStart the window's start attribute value is the min (rowtime) + * of all rows in the window. + * @param windowEnd the window's end property value is max (rowtime) + gap + * for all rows in the window. + */ + def doCollect( + out: Collector[Row], + accumulatorList: Array[JArrayList[Accumulator]], + windowStart: Long, + windowEnd: Long): Unit = { + + // merge the accumulators into one accumulator + for (i <- aggregates.indices) { + aggregateBuffer.setField(accumStartPos + i, accumulatorList(i).get(0)) + } + + // intermediate Row WindowStartPos is rowtime pos. + aggregateBuffer.setField(rowTimeFieldPos, windowStart) + + // intermediate Row WindowEndPos is rowtime pos + 1. + aggregateBuffer.setField(rowTimeFieldPos + 1, windowEnd) + + out.collect(aggregateBuffer) + } + + override def getProducedType: TypeInformation[Row] = { + intermediateRowType + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/728c936d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala index 071f0ee..882f4b6 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala @@ -42,7 +42,7 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode) (1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"), (2L, 2, 2d, 2f, new BigDecimal("2"), "Hallo"), (3L, 2, 2d, 2f, new BigDecimal("2"), "Hello"), - (6L, 3, 3d, 3f, new BigDecimal("3"), "Hello"), + (7L, 3, 3d, 3f, new BigDecimal("3"), "Hello"), (4L, 5, 5d, 5f, new BigDecimal("5"), "Hello"), (16L, 4, 4d, 4f, new BigDecimal("4"), "Hello world"), (8L, 3, 3d, 3f, new BigDecimal("3"), "Hello world")) @@ -152,23 +152,28 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode) val expected = "Hallo,1,1970-01-01 00:00:00.002,1970-01-01 00:00:00.009\n" + "Hello world,1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.015\n" + "Hello world,1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.023\n" + - "Hello,3,1970-01-01 00:00:00.003,1970-01-01 00:00:00.013\n" + + "Hello,3,1970-01-01 00:00:00.003,1970-01-01 00:00:00.014\n" + "Hi,1,1970-01-01 00:00:00.001,1970-01-01 00:00:00.008" TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[UnsupportedOperationException]) - def testAlldEventTimeSessionGroupWindow(): Unit = { - // Non-grouping Session window on event-time are currently not supported + @Test + def testAllEventTimeSessionGroupWindow(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) val table = env .fromCollection(data) .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string) - val windowedTable =table - .window(Session withGap 7.milli on 'long as 'w) + + val results =table + .window(Session withGap 2.milli on 'long as 'w) .groupBy('w) - .select('string.count).toDataSet[Row].collect() + .select('string.count, 'w.start, 'w.end).toDataSet[Row].collect() + + val expected = "4,1970-01-01 00:00:00.001,1970-01-01 00:00:00.006\n" + + "2,1970-01-01 00:00:00.007,1970-01-01 00:00:00.01\n" + + "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.018" + TestBaseUtils.compareResultAsText(results.asJava, expected) } @Test(expected = classOf[ValidationException])
