[
https://issues.apache.org/jira/browse/FLINK-4693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15828010#comment-15828010
]
ASF GitHub Bot commented on FLINK-4693:
---------------------------------------
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/3150#discussion_r96629837
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
---
@@ -226,27 +225,102 @@ object AggregateUtil {
aggregates,
groupingOffsetMapping,
aggOffsetMapping,
- intermediateRowArity,
+ intermediateRowArity + 1,// the addition one field is used to
store time attribute
outputType.getFieldCount)
+
+ case EventTimeSessionGroupWindow(_, _, gap) =>
+ val (startPos, endPos) = if (isTimeWindow(window)) {
+ computeWindowStartEndPropertyPos(properties)
+ } else {
+ (None, None)
+ }
+ new DataSetSessionWindowAggregateReduceGroupFunction(
+ aggregates,
+ groupingOffsetMapping,
+ aggOffsetMapping,
+ // the addition two fields are used to store window-start and
window-end attributes
+ intermediateRowArity + 2,
+ outputType.getFieldCount,
+ startPos,
+ endPos,
+ asLong(gap))
case _ =>
throw new UnsupportedOperationException(s"$window is currently not
supported on batch")
}
}
/**
+ * Create a
[[org.apache.flink.api.common.functions.GroupCombineFunction]] that
pre-aggregation
+ * for aggregates.
+ * The function returns intermediate aggregate values of all aggregate
function which are
+ * organized by the following format:
+ *
+ * {{{
+ * avg(x) aggOffsetInRow = 2 count(z)
aggOffsetInRow = 5
+ * | |
windowEnd(max(row-time)
+ * | |
|
+ * v v
v
+ *
+---------+---------+--------+--------+--------+--------+-----------+---------+
+ * |groupKey1|groupKey2| sum1 | count1 | sum2 | count2
|windowStart|windowEnd|
+ *
+---------+---------+--------+--------+--------+--------+-----------+---------+
+ * ^ ^
+ * | |
+ * sum(y) aggOffsetInRow = 4
windowStart(min(row-time))
+ *
+ * }}}
+ *
+ */
+
--- End diff --
Remove empty line.
> Add session group-windows for batch tables
> -------------------------------------------
>
> Key: FLINK-4693
> URL: https://issues.apache.org/jira/browse/FLINK-4693
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: sunjincheng
>
> Add Session group-windows for batch tables as described in
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)