[ https://issues.apache.org/jira/browse/FLINK-5990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15938638#comment-15938638 ]
ASF GitHub Bot commented on FLINK-5990: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107670323 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -130,32 +167,72 @@ class DataStreamOverAggregate( val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] val result: DataStream[Row] = - // partitioned aggregation - if (partitionKeys.nonEmpty) { - val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction( - namedAggregates, - inputType) + // partitioned aggregation + if (partitionKeys.nonEmpty) { + val processFunction = AggregateUtil.createUnboundedProcessingOverProcessFunction( + namedAggregates, + inputType) - inputDS + inputDS .keyBy(partitionKeys: _*) .process(processFunction) .returns(rowTypeInfo) .name(aggOpName) .asInstanceOf[DataStream[Row]] - } - // non-partitioned aggregation - else { - val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction( - namedAggregates, - inputType, - false) - - inputDS - .process(processFunction).setParallelism(1).setMaxParallelism(1) - .returns(rowTypeInfo) - .name(aggOpName) - .asInstanceOf[DataStream[Row]] - } + } + // non-partitioned aggregation + else { + val processFunction = AggregateUtil.createUnboundedProcessingOverProcessFunction( + namedAggregates, + inputType, + false) + + inputDS + .process(processFunction).setParallelism(1).setMaxParallelism(1) + .returns(rowTypeInfo) + .name(aggOpName) + .asInstanceOf[DataStream[Row]] + } + result + } + + def createRowsClauseBoundedAndCurrentRowOverWindow( + inputDS: DataStream[Row], + isRowTimeType: Boolean = false): DataStream[Row] = { + + val overWindow: Group = logicWindow.groups.get(0) + val partitionKeys: Array[Int] = overWindow.keys.toArray + val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + val inputFields = (0 until inputType.getFieldCount).toArray + + val precedingOffset = + getLowerBoundary(logicWindow, overWindow, getInput()) + 1 + + // get the output types + val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + + val result: DataStream[Row] = + // partitioned aggregation + if (partitionKeys.nonEmpty) { + val processFunction = AggregateUtil.createRowsClauseBoundedOverProcessFunction( + namedAggregates, + inputType, + inputFields, + precedingOffset, + isRowTimeType + ) + inputDS + .keyBy(partitionKeys: _*) + .process(processFunction) + .returns(rowTypeInfo) + .name(aggOpName) + .asInstanceOf[DataStream[Row]] + } + // non-partitioned aggregation + else { + throw TableException( --- End diff -- Isn't the non-partitioned case analogous if we use `NullByteKeyExtractor`? > Add [partitioned] event time OVER ROWS BETWEEN x PRECEDING aggregation to SQL > ----------------------------------------------------------------------------- > > Key: FLINK-5990 > URL: https://issues.apache.org/jira/browse/FLINK-5990 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: sunjincheng > Assignee: sunjincheng > > The goal of this issue is to add support for OVER ROWS aggregations on event > time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND > CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND > CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is required > - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a > parameterless scalar function that just indicates event time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5803) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)