[
https://issues.apache.org/jira/browse/FLINK-4693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15828173#comment-15828173
]
ASF GitHub Bot commented on FLINK-4693:
---------------------------------------
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/3150#discussion_r96645923
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala
---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.{Collector, Preconditions}
+
+import scala.collection.JavaConversions._
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+
+/**
+ * It wraps the aggregate logic inside of
+ * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. It is
used for Session time-window
+ * on batch.
+ *
+ * @param aggregates The aggregate functions.
+ * @param groupKeysMapping The index mapping of group keys between
intermediate aggregate Row
+ * and output Row.
+ * @param aggregateMapping The index mapping between aggregate function
list and aggregated value
+ * index in output Row.
+ * @param intermediateRowArity The intermediate row field count.
+ * @param finalRowArity The output row field count.
+ * @param finalRowWindowStartPos The relative window-start field position.
+ * @param finalRowWindowEndPos The relative window-end field position.
+ * @param gap Session time window gap.
+ */
+class DataSetSessionWindowAggregateReduceGroupFunction(
+ aggregates: Array[Aggregate[_ <: Any]],
+ groupKeysMapping: Array[(Int, Int)],
+ aggregateMapping: Array[(Int, Int)],
+ intermediateRowArity: Int,
+ finalRowArity: Int,
+ finalRowWindowStartPos: Option[Int],
+ finalRowWindowEndPos: Option[Int],
+ gap:Long)
+ extends RichGroupReduceFunction[Row, Row] {
+
+ private var aggregateBuffer: Row = _
+ private var output: Row = _
+ private var collector: TimeWindowPropertyCollector = _
+ private var intermediateRowWindowStartPos = 0
+ private var intermediateRowWindowEndPos = 0
+
+ override def open(config: Configuration) {
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(groupKeysMapping)
+ aggregateBuffer = new Row(intermediateRowArity)
+ intermediateRowWindowStartPos = intermediateRowArity - 2
+ intermediateRowWindowEndPos = intermediateRowArity - 1
+ output = new Row(finalRowArity)
+ collector = new TimeWindowPropertyCollector(finalRowWindowStartPos,
finalRowWindowEndPos)
+ }
+
+ /**
+ * For grouped intermediate aggregate Rows, divide window according to
the window-start
+ * and window-end, merge data (within a unified window) into an
aggregate buffer, calculate
+ * aggregated values output from aggregate buffer, and then set them
into output
+ * Row based on the mapping relationship between intermediate aggregate
data and output data.
+ *
+ * @param records Grouped intermediate aggregate Rows iterator.
+ * @param out The collector to hand results to.
+ *
+ */
+ override def reduce(records: Iterable[Row], out: Collector[Row]): Unit =
{
+
+ var last: Row = null
+ var head: Row = null
+ var lastWindowEnd: Option[Long] = None
+ var currentWindowStart: Option[Long] = None
+
+ records.foreach(
+ (record) => {
+ currentWindowStart =
+
Some(record.getField(intermediateRowWindowStartPos).asInstanceOf[Long])
+ // initial traversal or opening a new window
+ if (lastWindowEnd.isEmpty ||
+ (lastWindowEnd.isDefined && currentWindowStart.get >
lastWindowEnd.get)) {
+
+ // calculate the current window and open a new window
+ if (lastWindowEnd.isDefined) {
+
+ // evaluate and emit the current window's result.
+ doEvaluateAndCollect(out, last, head)
+ }
+ // initiate intermediate aggregate value.
+ aggregates.foreach(_.initiate(aggregateBuffer))
+ head = record
+ }
+
+ aggregates.foreach(_.merge(record, aggregateBuffer))
+ last = record
+ lastWindowEnd = Some(getWindowEnd(last))
+ })
+
+ doEvaluateAndCollect(out, last, head)
+
+ }
+
+ def doEvaluateAndCollect(
+ out: Collector[Row],
+ last: Row,
+ head: Row): Unit = {
+ // set group keys value to final output.
+ groupKeysMapping.foreach {
+ case (after, previous) =>
+ output.setField(after, last.getField(previous))
+ }
+
+ // evaluate final aggregate value and set to output.
+ aggregateMapping.foreach {
+ case (after, previous) =>
+ output.setField(after,
aggregates(previous).evaluate(aggregateBuffer))
+ }
+
+ // adds TimeWindow properties to output then emit output
+ if (finalRowWindowStartPos.isDefined ||
finalRowWindowEndPos.isDefined) {
+ val start =
+ head.getField(intermediateRowWindowStartPos).asInstanceOf[Long]
+ val end = getWindowEnd(last)
+
+ collector.wrappedCollector = out
+ collector.timeWindow = new TimeWindow(start, end)
--- End diff --
Yes, I think this is nicer.
> Add session group-windows for batch tables
> -------------------------------------------
>
> Key: FLINK-4693
> URL: https://issues.apache.org/jira/browse/FLINK-4693
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: sunjincheng
>
> Add Session group-windows for batch tables as described in
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)