[
https://issues.apache.org/jira/browse/FLINK-5990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15936485#comment-15936485
]
ASF GitHub Bot commented on FLINK-5990:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3585#discussion_r107433959
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala
---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util.{ArrayList, List => JList}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+ * Process Function for ROWS clause event-time bounded OVER window
+ *
+ * @param aggregates the list of all
[[org.apache.flink.table.functions.AggregateFunction]]
+ * used for this aggregation
+ * @param aggFields the position (in the input Row) of the
input value for each aggregate
+ * @param forwardedFieldCount the count of forwarded fields.
+ * @param aggregationStateType the row type info of aggregation
+ * @param precedingOffset the preceding offset
+ */
+class RowsClauseBoundedOverProcessFunction(
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Int],
+ private val forwardedFieldCount: Int,
+ private val aggregationStateType: RowTypeInfo,
+ private val precedingOffset: Int)
+ extends ProcessFunction[Row, Row] {
+
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(aggFields)
+ Preconditions.checkArgument(aggregates.length == aggFields.length)
+ Preconditions.checkNotNull(forwardedFieldCount)
+ Preconditions.checkNotNull(aggregationStateType)
+ Preconditions.checkNotNull(precedingOffset)
+
+ private var output: Row = _
+
+ // the state which keeps the last triggering timestamp
+ private var lastTriggeringTsState: ValueState[Long] = _
+
+ // the state which keeps the count of data
+ private var dataCountState: ValueState[Long] = null
+
+ // the state which used to materialize the accumulator for incremental
calculation
+ private var accumulatorState: ValueState[Row] = _
+
+ // the state which keeps all the data that are not expired.
+ // The first element (as the mapState key) of the tuple is the time
stamp. Per each time stamp,
+ // the second element of tuple is a list that contains the entire data
of all the rows belonging
+ // to this time stamp.
+ private var dataState: MapState[Long, JList[Row]] = _
+
+ override def open(config: Configuration) {
+
+ output = new Row(forwardedFieldCount + aggregates.length)
+
+
+ val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("lastTriggeringTsState",
classOf[Long])
+ lastTriggeringTsState =
getRuntimeContext.getState(lastTriggeringTsDescriptor)
+
+ val dataCountStateDescriptor =
+ new ValueStateDescriptor[Long]("dataCountState", classOf[Long])
+ dataCountState = getRuntimeContext.getState(dataCountStateDescriptor)
+
+ val accumulatorStateDescriptor =
+ new ValueStateDescriptor[Row]("accumulatorState",
aggregationStateType)
+ accumulatorState =
getRuntimeContext.getState(accumulatorStateDescriptor)
+
+ val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+ new MapStateDescriptor[Long, JList[Row]](
+ "dataState",
+ classOf[Long],
+ classOf[JList[Row]])
+
+ dataState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+ }
+
+ override def processElement(
+ input: Row,
+ ctx: ProcessFunction[Row, Row]#Context,
+ out: Collector[Row]): Unit = {
+
+ // triggering timestamp for trigger calculation
+ val triggeringTs = ctx.timestamp
+
+ val lastTriggeringTs = lastTriggeringTsState.value
+ // check if the data is expired, if not, save the data and register
event time timer
+ if (triggeringTs > lastTriggeringTs && triggeringTs >
ctx.timerService.currentWatermark) {
--- End diff --
`triggeringTs > lastTriggeringTs` will be sufficient, actually
`triggeringTs >= laterTriggeringTs`. If `triggeringTs` is the last element
that we emitted, we can still emit the row even if it is smaller than the
current watermark.
> Add [partitioned] event time OVER ROWS BETWEEN x PRECEDING aggregation to SQL
> -----------------------------------------------------------------------------
>
> Key: FLINK-5990
> URL: https://issues.apache.org/jira/browse/FLINK-5990
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: sunjincheng
> Assignee: sunjincheng
>
> The goal of this issue is to add support for OVER ROWS aggregations on event
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT
> a,
> SUM(b) OVER (PARTITION BY c ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND
> CURRENT ROW) AS sumB,
> MIN(b) OVER (PARTITION BY c ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND
> CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is required
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a
> parameterless scalar function that just indicates event time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5803)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some
> of the restrictions are trivial to address, we can add the functionality in
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with
> RexOver expression).
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)