[ https://issues.apache.org/jira/browse/FLINK-5990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15938632#comment-15938632 ]
ASF GitHub Bot commented on FLINK-5990: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107668755 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -99,28 +100,64 @@ class DataStreamOverAggregate( .getFieldList .get(overWindow.orderKeys.getFieldCollations.get(0).getFieldIndex) .getValue - timeType match { case _: ProcTimeType => - // both ROWS and RANGE clause with UNBOUNDED PRECEDING and CURRENT ROW condition. - if (overWindow.lowerBound.isUnbounded && - overWindow.upperBound.isCurrentRow) { + // proc-time OVER window + if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + // non-bounded OVER window createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS) + } else if ( + overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded && + overWindow.upperBound.isCurrentRow) { + // bounded OVER window + if (overWindow.isRows) { + // ROWS clause bounded OVER window + createRowsClauseBoundedAndCurrentRowOverWindow(inputDS) + } else { + // RANGE clause bounded OVER window + throw new TableException( + "RANGE clause bounded proc-time OVER window no supported yet.") + } } else { throw new TableException( - "OVER window only support ProcessingTime UNBOUNDED PRECEDING and CURRENT ROW " + - "condition.") + "proc-time OVER window only support CURRENT ROW condition.") } case _: RowTimeType => - throw new TableException("OVER Window of the EventTime type is not currently supported.") + // row-time OVER window + if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + // non-bounded OVER window + if (overWindow.isRows) { --- End diff -- I don't think we need the rows / range distinction for unbounded-currentRow windows. > Add [partitioned] event time OVER ROWS BETWEEN x PRECEDING aggregation to SQL > ----------------------------------------------------------------------------- > > Key: FLINK-5990 > URL: https://issues.apache.org/jira/browse/FLINK-5990 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: sunjincheng > Assignee: sunjincheng > > The goal of this issue is to add support for OVER ROWS aggregations on event > time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND > CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND > CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is required > - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a > parameterless scalar function that just indicates event time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5803) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)