[
https://issues.apache.org/jira/browse/FLINK-5655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15945517#comment-15945517
]
ASF GitHub Bot commented on FLINK-5655:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3629#discussion_r108467898
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala
---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util.{List => JList, ArrayList => JArrayList}
+
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
TypeInformation}
+import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+ * Process Function for RANGE clause event-time bounded OVER window
+ *
+ * @param aggregates the list of all [[AggregateFunction]] used
for this aggregation
+ * @param aggFields the position (in the input Row) of the
input value for each aggregate
+ * @param forwardedFieldCount the count of forwarded fields.
+ * @param aggregationStateType the row type info of aggregation
+ * @param inputRowType the row type info of input row
+ * @param precedingOffset the preceding offset
+ */
+class RangeClauseBoundedOverProcessFunction(
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Int],
+ private val forwardedFieldCount: Int,
+ private val aggregationStateType: RowTypeInfo,
+ private val inputRowType: RowTypeInfo,
+ private val precedingOffset: Long)
+ extends ProcessFunction[Row, Row] {
+
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(aggFields)
+ Preconditions.checkArgument(aggregates.length == aggFields.length)
+ Preconditions.checkNotNull(forwardedFieldCount)
+ Preconditions.checkNotNull(aggregationStateType)
+ Preconditions.checkNotNull(precedingOffset)
+
+ private var output: Row = _
+
+ // the state which keeps the last triggering timestamp
+ private var lastTriggeringTsState: ValueState[Long] = _
+
+ // the state which used to materialize the accumulator for incremental
calculation
+ private var accumulatorState: ValueState[Row] = _
+
+ // the state which keeps all the data that are not expired.
+ // The first element (as the mapState key) of the tuple is the time
stamp. Per each time stamp,
+ // the second element of tuple is a list that contains the entire data
of all the rows belonging
+ // to this time stamp.
+ private var dataState: MapState[Long, JList[Row]] = _
+
+ override def open(config: Configuration) {
+
+ output = new Row(forwardedFieldCount + aggregates.length)
+
+ val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
+ new ValueStateDescriptor[Long]("lastTriggeringTsState",
classOf[Long])
+ lastTriggeringTsState =
getRuntimeContext.getState(lastTriggeringTsDescriptor)
+
+ val accumulatorStateDescriptor =
+ new ValueStateDescriptor[Row]("accumulatorState",
aggregationStateType)
+ accumulatorState =
getRuntimeContext.getState(accumulatorStateDescriptor)
+
+ val keyTypeInformation: TypeInformation[Long] =
+ BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
+ val valueTypeInformation: TypeInformation[JList[Row]] = new
ListTypeInfo[Row](inputRowType)
+
+ val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+ new MapStateDescriptor[Long, JList[Row]](
+ "dataState",
+ keyTypeInformation,
+ valueTypeInformation)
+
+ dataState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+ }
+
+ override def processElement(
+ input: Row,
+ ctx: ProcessFunction[Row, Row]#Context,
+ out: Collector[Row]): Unit = {
+
+ // triggering timestamp for trigger calculation
+ val triggeringTs = ctx.timestamp
+
+ val lastTriggeringTs = lastTriggeringTsState.value
+
+ // check if the data is expired, if not, save the data and register
event time timer
+ if (triggeringTs > lastTriggeringTs) {
+ val data = dataState.get(triggeringTs)
+ if (null != data) {
+ data.add(input)
+ dataState.put(triggeringTs, data)
+ } else {
+ val data = new JArrayList[Row]
+ data.add(input)
+ dataState.put(triggeringTs, data)
+ // register event time timer
+ ctx.timerService.registerEventTimeTimer(triggeringTs)
+ }
+ }
+ }
+
+ override def onTimer(
+ timestamp: Long,
+ ctx: ProcessFunction[Row, Row]#OnTimerContext,
+ out: Collector[Row]): Unit = {
+ // gets all window data from state for the calculation
+ val inputs: JList[Row] = dataState.get(timestamp)
+
+ if (null != inputs) {
+
+ var accumulators = accumulatorState.value
+ var dataListIndex = 0
+ var aggregatesIndex = 0
+
+ // initialize when first run or failover recovery per key
+ if (null == accumulators) {
+ accumulators = new Row(aggregates.length)
+ aggregatesIndex = 0
+ while (aggregatesIndex < aggregates.length) {
+ accumulators.setField(aggregatesIndex,
aggregates(aggregatesIndex).createAccumulator())
+ aggregatesIndex += 1
+ }
+ }
+
+ // keep up timestamps of retract data
+ val retractTsList: JList[Long] = new JArrayList[Long]
+
+ val dataTimestampIt = dataState.keys.iterator
+ while (dataTimestampIt.hasNext) {
+ val dataTs: Long = dataTimestampIt.next()
+ val offset = timestamp - dataTs
+ if (offset > precedingOffset) {
+ val retractDataList = dataState.get(dataTs)
+ dataListIndex = 0
+ while (dataListIndex < retractDataList.size()) {
+ aggregatesIndex = 0
+ while (aggregatesIndex < aggregates.length) {
+ val accumulator =
accumulators.getField(aggregatesIndex).asInstanceOf[Accumulator]
+ aggregates(aggregatesIndex)
+ .retract(accumulator,
retractDataList.get(dataListIndex).getField(aggFields(aggregatesIndex)))
+ aggregatesIndex += 1
+ }
+ dataListIndex += 1
+ retractTsList.add(dataTs)
+ }
+ }
+ }
+
+ // remove the data that has been retracted
+ dataListIndex = 0
+ while (dataListIndex < retractTsList.size) {
+ dataState.remove(retractTsList.get(dataListIndex))
+ dataListIndex += 1
+ }
+
+ // copy forwarded fields to output row
+ aggregatesIndex = 0
+ while (aggregatesIndex < forwardedFieldCount) {
+ output.setField(aggregatesIndex,
inputs.get(0).getField(aggregatesIndex))
+ aggregatesIndex += 1
+ }
+
+ dataListIndex = 0
+ while (dataListIndex < inputs.size()) {
+ // accumulate current row and set aggregate in output row
+ aggregatesIndex = 0
+ while (aggregatesIndex < aggregates.length) {
+ val index = forwardedFieldCount + aggregatesIndex
+ val accumulator =
accumulators.getField(aggregatesIndex).asInstanceOf[Accumulator]
+ aggregates(aggregatesIndex).accumulate(accumulator,
inputs.get(dataListIndex).getField(aggFields(aggregatesIndex)))
+ if (dataListIndex >= (inputs.size() - 1)) {
+ output.setField(index,
aggregates(aggregatesIndex).getValue(accumulator))
+ }
+ aggregatesIndex += 1
+ }
+ dataListIndex += 1
+ }
+
+
+ dataListIndex = 0
+ while (dataListIndex < inputs.size()) {
+ out.collect(output)
--- End diff --
We have to copy the forwarded fields for each row in `inputs` because each
row might be different. The aggregates can be reused, because they are all the
same for all records with the same time.
> Add event time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> ----------------------------------------------------------------
>
> Key: FLINK-5655
> URL: https://issues.apache.org/jira/browse/FLINK-5655
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Fabian Hueske
> Assignee: sunjincheng
>
> The goal of this issue is to add support for OVER RANGE aggregations on event
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT
> a,
> SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN INTERVAL '1'
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
> MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN INTERVAL '1'
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5658)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some
> of the restrictions are trivial to address, we can add the functionality in
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with
> RexOver expression).
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)